X-Git-Url: https://review.fuel-infra.org/gitweb?a=blobdiff_plain;f=python-eventlet%2Ftests%2Fgreenio_test.py;h=99119b37bc1d4abfe7aee0897e11850b1a32e83e;hb=3dbfedbaa1a106967b7588f6ce50b89788837a33;hp=8a94b7bddd80d1b1d45460ddac34e0f15192260d;hpb=358bd9258c2b6d2ee74de4dfd07a5123107abad4;p=packages%2Ftrusty%2Fpython-eventlet.git diff --git a/python-eventlet/tests/greenio_test.py b/python-eventlet/tests/greenio_test.py index 8a94b7b..99119b3 100644 --- a/python-eventlet/tests/greenio_test.py +++ b/python-eventlet/tests/greenio_test.py @@ -3,6 +3,7 @@ import errno import eventlet import fcntl import gc +from io import DEFAULT_BUFFER_SIZE import os import shutil import socket as _orig_sock @@ -11,6 +12,7 @@ import tempfile from nose.tools import eq_ +import eventlet from eventlet import event, greenio, debug from eventlet.hubs import get_hub from eventlet.green import select, socket, time, ssl @@ -18,10 +20,6 @@ from eventlet.support import capture_stderr, get_errno, six import tests -if six.PY3: - buffer = memoryview - - def bufsized(sock, size=1): """ Resize both send and receive buffers on a socket. Useful for testing trampoline. Returns the socket. @@ -34,6 +32,15 @@ def bufsized(sock, size=1): return sock +def expect_socket_timeout(function, *args): + try: + function(*args) + raise AssertionError("socket.timeout not raised") + except socket.timeout as e: + assert hasattr(e, 'args') + eq_(e.args[0], 'timed out') + + def min_buf_size(): """Return the minimum buffer size that the platform supports.""" test_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -71,12 +78,9 @@ class TestGreenSocket(tests.LimitedTestCase): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(0.1) gs = greenio.GreenSocket(s) + try: - gs.connect(('192.0.2.1', 80)) - self.fail("socket.timeout not raised") - except socket.timeout as e: - assert hasattr(e, 'args') - self.assertEqual(e.args[0], 'timed out') + expect_socket_timeout(gs.connect, ('192.0.2.1', 80)) except socket.error as e: # unreachable is also a valid outcome if not get_errno(e) in (errno.EHOSTUNREACH, errno.ENETUNREACH): @@ -89,12 +93,7 @@ class TestGreenSocket(tests.LimitedTestCase): s.settimeout(0.1) gs = greenio.GreenSocket(s) - try: - gs.accept() - self.fail("socket.timeout not raised") - except socket.timeout as e: - assert hasattr(e, 'args') - self.assertEqual(e.args[0], 'timed out') + expect_socket_timeout(gs.accept) def test_connect_ex_timeout(self): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -125,12 +124,8 @@ class TestGreenSocket(tests.LimitedTestCase): client.connect(addr) - try: - client.recv(8192) - self.fail("socket.timeout not raised") - except socket.timeout as e: - assert hasattr(e, 'args') - self.assertEqual(e.args[0], 'timed out') + expect_socket_timeout(client.recv, 0) + expect_socket_timeout(client.recv, 8192) evt.send() gt.wait() @@ -141,12 +136,8 @@ class TestGreenSocket(tests.LimitedTestCase): gs.settimeout(.1) gs.bind(('', 0)) - try: - gs.recvfrom(8192) - self.fail("socket.timeout not raised") - except socket.timeout as e: - assert hasattr(e, 'args') - self.assertEqual(e.args[0], 'timed out') + expect_socket_timeout(gs.recvfrom, 0) + expect_socket_timeout(gs.recvfrom, 8192) def test_recvfrom_into_timeout(self): buf = array.array('B') @@ -156,12 +147,7 @@ class TestGreenSocket(tests.LimitedTestCase): gs.settimeout(.1) gs.bind(('', 0)) - try: - gs.recvfrom_into(buf) - self.fail("socket.timeout not raised") - except socket.timeout as e: - assert hasattr(e, 'args') - self.assertEqual(e.args[0], 'timed out') + expect_socket_timeout(gs.recvfrom_into, buf) def test_recv_into_timeout(self): buf = array.array('B') @@ -186,12 +172,7 @@ class TestGreenSocket(tests.LimitedTestCase): client.connect(addr) - try: - client.recv_into(buf) - self.fail("socket.timeout not raised") - except socket.timeout as e: - assert hasattr(e, 'args') - self.assertEqual(e.args[0], 'timed out') + expect_socket_timeout(client.recv_into, buf) evt.send() gt.wait() @@ -214,19 +195,17 @@ class TestGreenSocket(tests.LimitedTestCase): client = bufsized(greenio.GreenSocket(socket.socket())) client.connect(addr) - try: - client.settimeout(0.00001) - msg = b"A" * 100000 # large enough number to overwhelm most buffers - total_sent = 0 - # want to exceed the size of the OS buffer so it'll block in a - # single send + client.settimeout(0.00001) + msg = b"A" * 100000 # large enough number to overwhelm most buffers + + # want to exceed the size of the OS buffer so it'll block in a + # single send + def send(): for x in range(10): - total_sent += client.send(msg) - self.fail("socket.timeout not raised") - except socket.timeout as e: - assert hasattr(e, 'args') - self.assertEqual(e.args[0], 'timed out') + client.send(msg) + + expect_socket_timeout(send) evt.send() gt.wait() @@ -251,15 +230,9 @@ class TestGreenSocket(tests.LimitedTestCase): client.settimeout(0.1) client.connect(addr) - try: - msg = b"A" * (8 << 20) - - # want to exceed the size of the OS buffer so it'll block - client.sendall(msg) - self.fail("socket.timeout not raised") - except socket.timeout as e: - assert hasattr(e, 'args') - self.assertEqual(e.args[0], 'timed out') + # want to exceed the size of the OS buffer so it'll block + msg = b"A" * (8 << 20) + expect_socket_timeout(client.sendall, msg) evt.send() gt.wait() @@ -509,6 +482,9 @@ class TestGreenSocket(tests.LimitedTestCase): while True: try: sock.sendall(b'hello world') + # Arbitrary delay to not use all available CPU, keeps the test + # running quickly and reliably under a second + time.sleep(0.001) except socket.error as e: if get_errno(e) == errno.EPIPE: return @@ -524,6 +500,9 @@ class TestGreenSocket(tests.LimitedTestCase): while True: data = client.recv(1024) assert data + # Arbitrary delay to not use all available CPU, keeps the test + # running quickly and reliably under a second + time.sleep(0.001) except socket.error as e: # we get an EBADF because client is closed in the same process # (but a different greenthread) @@ -615,6 +594,21 @@ class TestGreenSocket(tests.LimitedTestCase): # should not raise greenio.shutdown_safe(sock) + def test_datagram_socket_operations_work(self): + receiver = greenio.GreenSocket(socket.AF_INET, socket.SOCK_DGRAM) + receiver.bind(('127.0.0.1', 0)) + address = receiver.getsockname() + + sender = greenio.GreenSocket(socket.AF_INET, socket.SOCK_DGRAM) + + # Two ways sendto can be called + sender.sendto(b'first', address) + sender.sendto(b'second', 0, address) + + sender_address = ('127.0.0.1', sender.getsockname()[1]) + eq_(receiver.recvfrom(1024), (b'first', sender_address)) + eq_(receiver.recvfrom(1024), (b'second', sender_address)) + def test_get_fileno_of_a_socket_works(): class DummySocket(object): @@ -711,6 +705,36 @@ class TestGreenPipe(tests.LimitedTestCase): gt.wait() + def test_pip_read_until_end(self): + # similar to test_pip_read above but reading until eof + r, w = os.pipe() + + r = greenio.GreenPipe(r, 'rb') + w = greenio.GreenPipe(w, 'wb') + + w.write(b'c' * DEFAULT_BUFFER_SIZE * 2) + w.close() + + buf = r.read() # no chunk size specified; read until end + self.assertEqual(len(buf), 2 * DEFAULT_BUFFER_SIZE) + self.assertEqual(buf[:3], b'ccc') + + def test_pipe_read_unbuffered(self): + # Ensure that seting the buffer size works properly on GreenPipes, + # it used to be ignored on Python 2 and the test would hang on r.readline() + # below. + r, w = os.pipe() + + r = greenio.GreenPipe(r, 'rb', 0) + w = greenio.GreenPipe(w, 'wb', 0) + + w.write(b'line\n') + + line = r.readline() + self.assertEqual(line, b'line\n') + r.close() + w.close() + def test_pipe_writes_large_messages(self): r, w = os.pipe() @@ -769,7 +793,7 @@ class TestGreenIoLong(tests.LimitedTestCase): TEST_TIMEOUT = 10 # the test here might take a while depending on the OS @tests.skip_with_pyevent - def test_multiple_readers(self, clibufsize=False): + def test_multiple_readers(self): debug.hub_prevent_multiple_readers(False) recvsize = 2 * min_buf_size() sendsize = 10 * recvsize @@ -809,11 +833,15 @@ class TestGreenIoLong(tests.LimitedTestCase): server_coro = eventlet.spawn(server) client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.connect(('127.0.0.1', listener.getsockname()[1])) - if clibufsize: - bufsized(client, size=sendsize) - else: - bufsized(client) - client.sendall(b'*' * sendsize) + bufsized(client, size=sendsize) + + # Split into multiple chunks so that we can wait a little + # every iteration which allows both readers to queue and + # recv some data when we actually send it. + for i in range(20): + eventlet.sleep(0.001) + client.sendall(b'*' * (sendsize // 20)) + client.close() server_coro.wait() listener.close() @@ -821,110 +849,6 @@ class TestGreenIoLong(tests.LimitedTestCase): assert len(results2) > 0 debug.hub_prevent_multiple_readers() - @tests.skipped # by rdw because it fails but it's not clear how to make it pass - @tests.skip_with_pyevent - def test_multiple_readers2(self): - self.test_multiple_readers(clibufsize=True) - - -class TestGreenIoStarvation(tests.LimitedTestCase): - # fixme: this doesn't succeed, because of eventlet's predetermined - # ordering. two processes, one with server, one with client eventlets - # might be more reliable? - - TEST_TIMEOUT = 300 # the test here might take a while depending on the OS - - @tests.skipped # by rdw, because it fails but it's not clear how to make it pass - @tests.skip_with_pyevent - def test_server_starvation(self, sendloops=15): - recvsize = 2 * min_buf_size() - sendsize = 10000 * recvsize - - results = [[] for i in range(5)] - - listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - listener.bind(('127.0.0.1', 0)) - port = listener.getsockname()[1] - listener.listen(50) - - base_time = time.time() - - def server(my_results): - sock, addr = listener.accept() - - datasize = 0 - - t1 = None - t2 = None - try: - while True: - data = sock.recv(recvsize) - if not t1: - t1 = time.time() - base_time - if not data: - t2 = time.time() - base_time - my_results.append(datasize) - my_results.append((t1, t2)) - break - datasize += len(data) - finally: - sock.close() - - def client(): - pid = os.fork() - if pid: - return pid - - client = _orig_sock.socket(socket.AF_INET, socket.SOCK_STREAM) - client.connect(('127.0.0.1', port)) - - bufsized(client, size=sendsize) - - for i in range(sendloops): - client.sendall(b'*' * sendsize) - client.close() - os._exit(0) - - clients = [] - servers = [] - for r in results: - servers.append(eventlet.spawn(server, r)) - for r in results: - clients.append(client()) - - for s in servers: - s.wait() - for c in clients: - os.waitpid(c, 0) - - listener.close() - - # now test that all of the server receive intervals overlap, and - # that there were no errors. - for r in results: - assert len(r) == 2, "length is %d not 2!: %s\n%s" % (len(r), r, results) - assert r[0] == sendsize * sendloops - assert len(r[1]) == 2 - assert r[1][0] is not None - assert r[1][1] is not None - - starttimes = sorted(r[1][0] for r in results) - endtimes = sorted(r[1][1] for r in results) - runlengths = sorted(r[1][1] - r[1][0] for r in results) - - # assert that the last task started before the first task ended - # (our no-starvation condition) - assert starttimes[-1] < endtimes[0], \ - "Not overlapping: starts %s ends %s" % (starttimes, endtimes) - - maxstartdiff = starttimes[-1] - starttimes[0] - - assert maxstartdiff * 2 < runlengths[0], \ - "Largest difference in starting times more than twice the shortest running time!" - assert runlengths[0] * 2 > runlengths[-1], \ - "Longest runtime more than twice as long as shortest!" - def test_set_nonblocking(): sock = _orig_sock.socket(socket.AF_INET, socket.SOCK_DGRAM) @@ -954,3 +878,76 @@ def test_socket_del_fails_gracefully_when_not_fully_initialized(): def test_double_close_219(): tests.run_isolated('greenio_double_close_219.py') + + +def test_partial_write_295(): + # https://github.com/eventlet/eventlet/issues/295 + # `socket.makefile('w').writelines()` must send all + # despite partial writes by underlying socket + listen_socket = eventlet.listen(('localhost', 0)) + original_accept = listen_socket.accept + + def talk(conn): + f = conn.makefile('wb') + line = b'*' * 2140 + f.writelines([line] * 10000) + conn.close() + + def accept(): + connection, address = original_accept() + original_send = connection.send + + def slow_send(b, *args): + b = b[:1031] + return original_send(b, *args) + + connection.send = slow_send + eventlet.spawn(talk, connection) + return connection, address + + listen_socket.accept = accept + + eventlet.spawn(listen_socket.accept) + sock = eventlet.connect(listen_socket.getsockname()) + with eventlet.Timeout(10): + bs = sock.makefile('rb').read() + assert len(bs) == 21400000 + assert bs == (b'*' * 21400000) + + +def test_socket_file_read_non_int(): + listen_socket = eventlet.listen(('localhost', 0)) + + def server(): + conn, _ = listen_socket.accept() + conn.recv(1) + conn.sendall('response') + conn.close() + + eventlet.spawn(server) + sock = eventlet.connect(listen_socket.getsockname()) + + fd = sock.makefile('rwb') + fd.write(b'?') + fd.flush() + with eventlet.Timeout(1): + try: + fd.read("This shouldn't work") + assert False + except TypeError: + pass + + +def test_pipe_context(): + # ensure using a pipe as a context actually closes it. + r, w = os.pipe() + r = greenio.GreenPipe(r) + w = greenio.GreenPipe(w, 'w') + + with r: + pass + assert r.closed and not w.closed + + with w as f: + assert f == w + assert r.closed and w.closed