8 import socket as _orig_sock
12 from eventlet import event, greenio, debug
13 from eventlet.hubs import get_hub
14 from eventlet.green import select, socket, time, ssl
15 from eventlet.support import get_errno, six
17 LimitedTestCase, main,
18 skip_with_pyevent, skipped, skip_if, skip_on_windows,
26 def bufsized(sock, size=1):
27 """ Resize both send and receive buffers on a socket.
28 Useful for testing trampoline. Returns the socket.
31 >>> sock = bufsized(socket.socket(socket.AF_INET, socket.SOCK_STREAM))
33 sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, size)
34 sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, size)
39 """Return the minimum buffer size that the platform supports."""
40 test_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
41 test_sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1)
42 return test_sock.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF)
45 def using_epoll_hub(_f):
47 return 'epolls' in type(get_hub()).__module__
52 def using_kqueue_hub(_f):
54 return 'kqueue' in type(get_hub()).__module__
59 class TestGreenSocket(LimitedTestCase):
60 def assertWriteToClosedFileRaises(self, fd):
61 if sys.version_info[0] < 3:
62 # 2.x socket._fileobjects are odd: writes don't check
63 # whether the socket is closed or not, and you get an
64 # AttributeError during flush if it is closed
66 self.assertRaises(Exception, fd.flush)
68 # 3.x io write to closed file-like pbject raises ValueError
69 self.assertRaises(ValueError, fd.write, b'a')
71 def test_connect_timeout(self):
72 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
74 gs = greenio.GreenSocket(s)
76 gs.connect(('192.0.2.1', 80))
77 self.fail("socket.timeout not raised")
78 except socket.timeout as e:
79 assert hasattr(e, 'args')
80 self.assertEqual(e.args[0], 'timed out')
81 except socket.error as e:
82 # unreachable is also a valid outcome
83 if not get_errno(e) in (errno.EHOSTUNREACH, errno.ENETUNREACH):
86 def test_accept_timeout(self):
87 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
92 gs = greenio.GreenSocket(s)
95 self.fail("socket.timeout not raised")
96 except socket.timeout as e:
97 assert hasattr(e, 'args')
98 self.assertEqual(e.args[0], 'timed out')
100 def test_connect_ex_timeout(self):
101 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
103 gs = greenio.GreenSocket(s)
104 e = gs.connect_ex(('192.0.2.1', 80))
105 if not e in (errno.EHOSTUNREACH, errno.ENETUNREACH):
106 self.assertEqual(e, errno.EAGAIN)
108 def test_recv_timeout(self):
109 listener = greenio.GreenSocket(socket.socket())
110 listener.bind(('', 0))
116 # accept the connection in another greenlet
117 sock, addr = listener.accept()
120 gt = eventlet.spawn(server)
122 addr = listener.getsockname()
124 client = greenio.GreenSocket(socket.socket())
125 client.settimeout(0.1)
131 self.fail("socket.timeout not raised")
132 except socket.timeout as e:
133 assert hasattr(e, 'args')
134 self.assertEqual(e.args[0], 'timed out')
139 def test_recvfrom_timeout(self):
140 gs = greenio.GreenSocket(
141 socket.socket(socket.AF_INET, socket.SOCK_DGRAM))
147 self.fail("socket.timeout not raised")
148 except socket.timeout as e:
149 assert hasattr(e, 'args')
150 self.assertEqual(e.args[0], 'timed out')
152 def test_recvfrom_into_timeout(self):
153 buf = array.array('B')
155 gs = greenio.GreenSocket(
156 socket.socket(socket.AF_INET, socket.SOCK_DGRAM))
161 gs.recvfrom_into(buf)
162 self.fail("socket.timeout not raised")
163 except socket.timeout as e:
164 assert hasattr(e, 'args')
165 self.assertEqual(e.args[0], 'timed out')
167 def test_recv_into_timeout(self):
168 buf = array.array('B')
170 listener = greenio.GreenSocket(socket.socket())
171 listener.bind(('', 0))
177 # accept the connection in another greenlet
178 sock, addr = listener.accept()
181 gt = eventlet.spawn(server)
183 addr = listener.getsockname()
185 client = greenio.GreenSocket(socket.socket())
186 client.settimeout(0.1)
191 client.recv_into(buf)
192 self.fail("socket.timeout not raised")
193 except socket.timeout as e:
194 assert hasattr(e, 'args')
195 self.assertEqual(e.args[0], 'timed out')
200 def test_send_timeout(self):
201 self.reset_timeout(2)
202 listener = bufsized(eventlet.listen(('', 0)))
207 # accept the connection in another greenlet
208 sock, addr = listener.accept()
209 sock = bufsized(sock)
212 gt = eventlet.spawn(server)
214 addr = listener.getsockname()
216 client = bufsized(greenio.GreenSocket(socket.socket()))
219 client.settimeout(0.00001)
220 msg = b"A" * 100000 # large enough number to overwhelm most buffers
223 # want to exceed the size of the OS buffer so it'll block in a
226 total_sent += client.send(msg)
227 self.fail("socket.timeout not raised")
228 except socket.timeout as e:
229 assert hasattr(e, 'args')
230 self.assertEqual(e.args[0], 'timed out')
235 def test_sendall_timeout(self):
236 listener = greenio.GreenSocket(socket.socket())
237 listener.bind(('', 0))
243 # accept the connection in another greenlet
244 sock, addr = listener.accept()
247 gt = eventlet.spawn(server)
249 addr = listener.getsockname()
251 client = greenio.GreenSocket(socket.socket())
252 client.settimeout(0.1)
256 msg = b"A" * (8 << 20)
258 # want to exceed the size of the OS buffer so it'll block
260 self.fail("socket.timeout not raised")
261 except socket.timeout as e:
262 assert hasattr(e, 'args')
263 self.assertEqual(e.args[0], 'timed out')
268 def test_close_with_makefile(self):
269 def accept_close_early(listener):
270 # verify that the makefile and the socket are truly independent
271 # by closing the socket prior to using the made file
273 conn, addr = listener.accept()
274 fd = conn.makefile('w')
278 self.assertWriteToClosedFileRaises(fd)
279 self.assertRaises(socket.error, conn.send, b'b')
283 def accept_close_late(listener):
284 # verify that the makefile and the socket are truly independent
285 # by closing the made file and then sending a character
287 conn, addr = listener.accept()
288 fd = conn.makefile('w')
293 self.assertWriteToClosedFileRaises(fd)
294 self.assertRaises(socket.error, conn.send, b'b')
298 def did_it_work(server):
299 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
300 client.connect(('127.0.0.1', server.getsockname()[1]))
301 fd = client.makefile()
303 assert fd.readline() == b'hello\n'
304 assert fd.read() == b''
307 server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
308 server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
309 server.bind(('0.0.0.0', 0))
311 killer = eventlet.spawn(accept_close_early, server)
315 server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
316 server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
317 server.bind(('0.0.0.0', 0))
319 killer = eventlet.spawn(accept_close_late, server)
323 def test_del_closes_socket(self):
324 def accept_once(listener):
325 # delete/overwrite the original conn
326 # object, only keeping the file object around
327 # closing the file object should close everything
329 conn, addr = listener.accept()
330 conn = conn.makefile('w')
331 conn.write(b'hello\n')
334 self.assertWriteToClosedFileRaises(conn)
338 server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
339 server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
340 server.bind(('127.0.0.1', 0))
342 killer = eventlet.spawn(accept_once, server)
343 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
344 client.connect(('127.0.0.1', server.getsockname()[1]))
345 fd = client.makefile()
347 assert fd.read() == b'hello\n'
348 assert fd.read() == b''
352 def test_full_duplex(self):
353 large_data = b'*' * 10 * min_buf_size()
354 listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
355 listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
356 listener.bind(('127.0.0.1', 0))
360 def send_large(sock):
361 sock.sendall(large_data)
363 def read_large(sock):
364 result = sock.recv(len(large_data))
365 while len(result) < len(large_data):
366 result += sock.recv(len(large_data))
367 self.assertEqual(result, large_data)
370 (sock, addr) = listener.accept()
371 sock = bufsized(sock)
372 send_large_coro = eventlet.spawn(send_large, sock)
374 result = sock.recv(10)
375 expected = b'hello world'
376 while len(result) < len(expected):
377 result += sock.recv(10)
378 self.assertEqual(result, expected)
379 send_large_coro.wait()
381 server_evt = eventlet.spawn(server)
382 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
383 client.connect(('127.0.0.1', listener.getsockname()[1]))
385 large_evt = eventlet.spawn(read_large, client)
387 client.sendall(b'hello world')
392 def test_sendall(self):
393 # test adapted from Marcus Cavanaugh's email
394 # it may legitimately take a while, but will eventually complete
398 def test_sendall_impl(many_bytes):
399 bufsize = max(many_bytes // 15, 2)
401 def sender(listener):
402 (sock, addr) = listener.accept()
403 sock = bufsized(sock, size=bufsize)
404 sock.sendall(b'x' * many_bytes)
405 sock.sendall(b'y' * second_bytes)
407 listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
408 listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
409 listener.bind(("", 0))
411 sender_coro = eventlet.spawn(sender, listener)
412 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
413 client.connect(('127.0.0.1', listener.getsockname()[1]))
414 bufsized(client, size=bufsize)
416 while total < many_bytes:
417 data = client.recv(min(many_bytes - total, many_bytes // 10))
423 while total < second_bytes:
424 data = client.recv(second_bytes)
432 for how_many in (1000, 10000, 100000, 1000000):
433 test_sendall_impl(how_many)
435 def test_wrap_socket(self):
436 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
437 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
438 sock.bind(('127.0.0.1', 0))
440 ssl.wrap_socket(sock)
442 def test_timeout_and_final_write(self):
443 # This test verifies that a write on a socket that we've
444 # stopped listening for doesn't result in an incorrect switch
445 server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
446 server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
447 server.bind(('127.0.0.1', 0))
449 bound_port = server.getsockname()[1]
452 s2, addr = server.accept()
453 wrap_wfile = s2.makefile('w')
456 wrap_wfile.write(b'hi')
458 evt.send(b'sent via event')
461 eventlet.spawn(sender, evt)
462 # lets the socket enter accept mode, which
463 # is necessary for connect to succeed on windows
466 # try and get some data off of this pipe
467 # but bail before any is sent
468 eventlet.Timeout(0.01)
469 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
470 client.connect(('127.0.0.1', bound_port))
471 wrap_rfile = client.makefile()
474 except eventlet.TimeoutError:
478 self.assertEqual(result, b'sent via event')
483 def test_raised_multiple_readers(self):
484 debug.hub_prevent_multiple_readers(True)
486 def handle(sock, addr):
489 raise eventlet.StopServe()
491 listener = eventlet.listen(('127.0.0.1', 0))
492 eventlet.spawn(eventlet.serve, listener, handle)
497 s = eventlet.connect(('127.0.0.1', listener.getsockname()[1]))
498 a = eventlet.spawn(reader, s)
500 self.assertRaises(RuntimeError, s.recv, 1)
505 @skip_if(using_epoll_hub)
506 @skip_if(using_kqueue_hub)
507 def test_closure(self):
508 def spam_to_me(address):
509 sock = eventlet.connect(address)
512 sock.sendall(b'hello world')
513 except socket.error as e:
514 if get_errno(e) == errno.EPIPE:
518 server = eventlet.listen(('127.0.0.1', 0))
519 sender = eventlet.spawn(spam_to_me, server.getsockname())
520 client, address = server.accept()
526 data = client.recv(1024)
528 except socket.error as e:
529 # we get an EBADF because client is closed in the same process
530 # (but a different greenthread)
531 if get_errno(e) != errno.EBADF:
537 reader = eventlet.spawn(reader)
538 eventlet.spawn_n(closer)
542 def test_invalid_connection(self):
543 # find an unused port by creating a socket then closing it
544 listening_socket = eventlet.listen(('127.0.0.1', 0))
545 port = listening_socket.getsockname()[1]
546 listening_socket.close()
547 self.assertRaises(socket.error, eventlet.connect, ('127.0.0.1', port))
549 def test_zero_timeout_and_back(self):
550 listen = eventlet.listen(('', 0))
551 # Keep reference to server side of socket
552 server = eventlet.spawn(listen.accept)
553 client = eventlet.connect(listen.getsockname())
555 client.settimeout(0.05)
556 # Now must raise socket.timeout
557 self.assertRaises(socket.timeout, client.recv, 1)
560 # Now must raise socket.error with EAGAIN
564 except socket.error as e:
565 assert get_errno(e) == errno.EAGAIN
567 client.settimeout(0.05)
568 # Now socket.timeout again
569 self.assertRaises(socket.timeout, client.recv, 1)
572 def test_default_nonblocking(self):
573 sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
574 flags = fcntl.fcntl(sock1.fd.fileno(), fcntl.F_GETFL)
575 assert flags & os.O_NONBLOCK
577 sock2 = socket.socket(sock1.fd)
578 flags = fcntl.fcntl(sock2.fd.fileno(), fcntl.F_GETFL)
579 assert flags & os.O_NONBLOCK
581 def test_dup_nonblocking(self):
582 sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
583 flags = fcntl.fcntl(sock1.fd.fileno(), fcntl.F_GETFL)
584 assert flags & os.O_NONBLOCK
587 flags = fcntl.fcntl(sock2.fd.fileno(), fcntl.F_GETFL)
588 assert flags & os.O_NONBLOCK
590 def test_skip_nonblocking(self):
591 sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
592 fd = sock1.fd.fileno()
593 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
594 flags = fcntl.fcntl(fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK)
595 assert flags & os.O_NONBLOCK == 0
597 sock2 = socket.socket(sock1.fd, set_nonblocking=False)
598 flags = fcntl.fcntl(sock2.fd.fileno(), fcntl.F_GETFL)
599 assert flags & os.O_NONBLOCK == 0
601 def test_sockopt_interface(self):
602 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
603 assert sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 0
604 assert sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) == '\000'
605 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
607 def test_socketpair_select(self):
608 # https://github.com/eventlet/eventlet/pull/25
609 s1, s2 = socket.socketpair()
610 assert select.select([], [s1], [], 0) == ([], [s1], [])
611 assert select.select([], [s1], [], 0) == ([], [s1], [])
614 class TestGreenPipe(LimitedTestCase):
617 super(self.__class__, self).setUp()
618 self.tempdir = tempfile.mkdtemp('_green_pipe_test')
621 shutil.rmtree(self.tempdir)
622 super(self.__class__, self).tearDown()
626 rf = greenio.GreenPipe(r, 'r')
627 wf = greenio.GreenPipe(w, 'w', 0)
629 def sender(f, content):
631 eventlet.sleep(0.0001)
635 one_line = b"12345\n"
636 eventlet.spawn(sender, wf, one_line * 5)
640 self.assertEqual(line, one_line)
641 self.assertEqual(rf.readline(), '')
643 def test_pipe_read(self):
644 # ensure that 'readline' works properly on GreenPipes when data is not
645 # immediately available (fd is nonblocking, was raising EAGAIN)
646 # also ensures that readline() terminates on '\n' and '\r\n'
649 r = greenio.GreenPipe(r)
650 w = greenio.GreenPipe(w, 'w')
661 gt = eventlet.spawn(writer)
666 self.assertEqual(line, 'line\n')
669 self.assertEqual(line, 'line\r\n')
673 def test_pipe_writes_large_messages(self):
676 r = greenio.GreenPipe(r)
677 w = greenio.GreenPipe(w, 'w')
679 large_message = b"".join([1024 * chr(i) for i in range(65)])
682 w.write(large_message)
685 gt = eventlet.spawn(writer)
689 expected = 1024 * chr(i)
692 "expected=%r..%r, found=%r..%r iter=%d"
693 % (expected[:4], expected[-4:], buf[:4], buf[-4:], i))
696 def test_seek_on_buffered_pipe(self):
697 f = greenio.GreenPipe(self.tempdir + "/TestFile", 'w+', 1024)
698 self.assertEqual(f.tell(), 0)
700 self.assertEqual(f.tell(), 0)
701 f.write(b'1234567890')
703 self.assertEqual(f.tell(), 10)
706 self.assertEqual(value, '1')
707 self.assertEqual(f.tell(), 1)
709 self.assertEqual(value, '2')
710 self.assertEqual(f.tell(), 2)
712 self.assertEqual(f.readline(), '34567890')
714 self.assertEqual(f.readline(), '67890')
716 self.assertEqual(f.readline(), '1234567890')
718 self.assertEqual(f.readline(), '')
720 def test_truncate(self):
721 f = greenio.GreenPipe(self.tempdir + "/TestFile", 'w+', 1024)
722 f.write(b'1234567890')
724 self.assertEqual(f.tell(), 9)
727 class TestGreenIoLong(LimitedTestCase):
728 TEST_TIMEOUT = 10 # the test here might take a while depending on the OS
731 def test_multiple_readers(self, clibufsize=False):
732 debug.hub_prevent_multiple_readers(False)
733 recvsize = 2 * min_buf_size()
734 sendsize = 10 * recvsize
736 # test that we can have multiple coroutines reading
737 # from the same fd. We make no guarantees about which one gets which
738 # bytes, but they should both get at least some
739 def reader(sock, results):
741 data = sock.recv(recvsize)
748 listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
749 listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
750 listener.bind(('127.0.0.1', 0))
754 (sock, addr) = listener.accept()
755 sock = bufsized(sock)
757 c1 = eventlet.spawn(reader, sock, results1)
758 c2 = eventlet.spawn(reader, sock, results2)
768 server_coro = eventlet.spawn(server)
769 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
770 client.connect(('127.0.0.1', listener.getsockname()[1]))
772 bufsized(client, size=sendsize)
775 client.sendall(b'*' * sendsize)
779 assert len(results1) > 0
780 assert len(results2) > 0
781 debug.hub_prevent_multiple_readers()
783 @skipped # by rdw because it fails but it's not clear how to make it pass
785 def test_multiple_readers2(self):
786 self.test_multiple_readers(clibufsize=True)
789 class TestGreenIoStarvation(LimitedTestCase):
790 # fixme: this doesn't succeed, because of eventlet's predetermined
791 # ordering. two processes, one with server, one with client eventlets
792 # might be more reliable?
794 TEST_TIMEOUT = 300 # the test here might take a while depending on the OS
796 @skipped # by rdw, because it fails but it's not clear how to make it pass
798 def test_server_starvation(self, sendloops=15):
799 recvsize = 2 * min_buf_size()
800 sendsize = 10000 * recvsize
802 results = [[] for i in range(5)]
804 listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
805 listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
806 listener.bind(('127.0.0.1', 0))
807 port = listener.getsockname()[1]
810 base_time = time.time()
812 def server(my_results):
813 sock, addr = listener.accept()
821 data = sock.recv(recvsize)
823 t1 = time.time() - base_time
825 t2 = time.time() - base_time
826 my_results.append(datasize)
827 my_results.append((t1, t2))
829 datasize += len(data)
838 client = _orig_sock.socket(socket.AF_INET, socket.SOCK_STREAM)
839 client.connect(('127.0.0.1', port))
841 bufsized(client, size=sendsize)
843 for i in range(sendloops):
844 client.sendall(b'*' * sendsize)
851 servers.append(eventlet.spawn(server, r))
853 clients.append(client())
862 # now test that all of the server receive intervals overlap, and
863 # that there were no errors.
865 assert len(r) == 2, "length is %d not 2!: %s\n%s" % (len(r), r, results)
866 assert r[0] == sendsize * sendloops
867 assert len(r[1]) == 2
868 assert r[1][0] is not None
869 assert r[1][1] is not None
871 starttimes = sorted(r[1][0] for r in results)
872 endtimes = sorted(r[1][1] for r in results)
873 runlengths = sorted(r[1][1] - r[1][0] for r in results)
875 # assert that the last task started before the first task ended
876 # (our no-starvation condition)
877 assert starttimes[-1] < endtimes[0], \
878 "Not overlapping: starts %s ends %s" % (starttimes, endtimes)
880 maxstartdiff = starttimes[-1] - starttimes[0]
882 assert maxstartdiff * 2 < runlengths[0], \
883 "Largest difference in starting times more than twice the shortest running time!"
884 assert runlengths[0] * 2 > runlengths[-1], \
885 "Longest runtime more than twice as long as shortest!"
888 def test_set_nonblocking():
889 sock = _orig_sock.socket(socket.AF_INET, socket.SOCK_DGRAM)
890 fileno = sock.fileno()
891 orig_flags = fcntl.fcntl(fileno, fcntl.F_GETFL)
892 assert orig_flags & os.O_NONBLOCK == 0
893 greenio.set_nonblocking(sock)
894 new_flags = fcntl.fcntl(fileno, fcntl.F_GETFL)
895 assert new_flags == (orig_flags | os.O_NONBLOCK)
898 if __name__ == '__main__':