Add python-eventlet package to MOS 9.0 repository
[packages/trusty/python-eventlet.git] / python-eventlet / tests / greenio_test.py
index 8a94b7bddd80d1b1d45460ddac34e0f15192260d..99119b37bc1d4abfe7aee0897e11850b1a32e83e 100644 (file)
@@ -3,6 +3,7 @@ import errno
 import eventlet
 import fcntl
 import gc
+from io import DEFAULT_BUFFER_SIZE
 import os
 import shutil
 import socket as _orig_sock
@@ -11,6 +12,7 @@ import tempfile
 
 from nose.tools import eq_
 
+import eventlet
 from eventlet import event, greenio, debug
 from eventlet.hubs import get_hub
 from eventlet.green import select, socket, time, ssl
@@ -18,10 +20,6 @@ from eventlet.support import capture_stderr, get_errno, six
 import tests
 
 
-if six.PY3:
-    buffer = memoryview
-
-
 def bufsized(sock, size=1):
     """ Resize both send and receive buffers on a socket.
     Useful for testing trampoline.  Returns the socket.
@@ -34,6 +32,15 @@ def bufsized(sock, size=1):
     return sock
 
 
+def expect_socket_timeout(function, *args):
+    try:
+        function(*args)
+        raise AssertionError("socket.timeout not raised")
+    except socket.timeout as e:
+        assert hasattr(e, 'args')
+        eq_(e.args[0], 'timed out')
+
+
 def min_buf_size():
     """Return the minimum buffer size that the platform supports."""
     test_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -71,12 +78,9 @@ class TestGreenSocket(tests.LimitedTestCase):
         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
         s.settimeout(0.1)
         gs = greenio.GreenSocket(s)
+
         try:
-            gs.connect(('192.0.2.1', 80))
-            self.fail("socket.timeout not raised")
-        except socket.timeout as e:
-            assert hasattr(e, 'args')
-            self.assertEqual(e.args[0], 'timed out')
+            expect_socket_timeout(gs.connect, ('192.0.2.1', 80))
         except socket.error as e:
             # unreachable is also a valid outcome
             if not get_errno(e) in (errno.EHOSTUNREACH, errno.ENETUNREACH):
@@ -89,12 +93,7 @@ class TestGreenSocket(tests.LimitedTestCase):
 
         s.settimeout(0.1)
         gs = greenio.GreenSocket(s)
-        try:
-            gs.accept()
-            self.fail("socket.timeout not raised")
-        except socket.timeout as e:
-            assert hasattr(e, 'args')
-            self.assertEqual(e.args[0], 'timed out')
+        expect_socket_timeout(gs.accept)
 
     def test_connect_ex_timeout(self):
         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -125,12 +124,8 @@ class TestGreenSocket(tests.LimitedTestCase):
 
         client.connect(addr)
 
-        try:
-            client.recv(8192)
-            self.fail("socket.timeout not raised")
-        except socket.timeout as e:
-            assert hasattr(e, 'args')
-            self.assertEqual(e.args[0], 'timed out')
+        expect_socket_timeout(client.recv, 0)
+        expect_socket_timeout(client.recv, 8192)
 
         evt.send()
         gt.wait()
@@ -141,12 +136,8 @@ class TestGreenSocket(tests.LimitedTestCase):
         gs.settimeout(.1)
         gs.bind(('', 0))
 
-        try:
-            gs.recvfrom(8192)
-            self.fail("socket.timeout not raised")
-        except socket.timeout as e:
-            assert hasattr(e, 'args')
-            self.assertEqual(e.args[0], 'timed out')
+        expect_socket_timeout(gs.recvfrom, 0)
+        expect_socket_timeout(gs.recvfrom, 8192)
 
     def test_recvfrom_into_timeout(self):
         buf = array.array('B')
@@ -156,12 +147,7 @@ class TestGreenSocket(tests.LimitedTestCase):
         gs.settimeout(.1)
         gs.bind(('', 0))
 
-        try:
-            gs.recvfrom_into(buf)
-            self.fail("socket.timeout not raised")
-        except socket.timeout as e:
-            assert hasattr(e, 'args')
-            self.assertEqual(e.args[0], 'timed out')
+        expect_socket_timeout(gs.recvfrom_into, buf)
 
     def test_recv_into_timeout(self):
         buf = array.array('B')
@@ -186,12 +172,7 @@ class TestGreenSocket(tests.LimitedTestCase):
 
         client.connect(addr)
 
-        try:
-            client.recv_into(buf)
-            self.fail("socket.timeout not raised")
-        except socket.timeout as e:
-            assert hasattr(e, 'args')
-            self.assertEqual(e.args[0], 'timed out')
+        expect_socket_timeout(client.recv_into, buf)
 
         evt.send()
         gt.wait()
@@ -214,19 +195,17 @@ class TestGreenSocket(tests.LimitedTestCase):
 
         client = bufsized(greenio.GreenSocket(socket.socket()))
         client.connect(addr)
-        try:
-            client.settimeout(0.00001)
-            msg = b"A" * 100000  # large enough number to overwhelm most buffers
 
-            total_sent = 0
-            # want to exceed the size of the OS buffer so it'll block in a
-            # single send
+        client.settimeout(0.00001)
+        msg = b"A" * 100000  # large enough number to overwhelm most buffers
+
+        # want to exceed the size of the OS buffer so it'll block in a
+        # single send
+        def send():
             for x in range(10):
-                total_sent += client.send(msg)
-            self.fail("socket.timeout not raised")
-        except socket.timeout as e:
-            assert hasattr(e, 'args')
-            self.assertEqual(e.args[0], 'timed out')
+                client.send(msg)
+
+        expect_socket_timeout(send)
 
         evt.send()
         gt.wait()
@@ -251,15 +230,9 @@ class TestGreenSocket(tests.LimitedTestCase):
         client.settimeout(0.1)
         client.connect(addr)
 
-        try:
-            msg = b"A" * (8 << 20)
-
-            # want to exceed the size of the OS buffer so it'll block
-            client.sendall(msg)
-            self.fail("socket.timeout not raised")
-        except socket.timeout as e:
-            assert hasattr(e, 'args')
-            self.assertEqual(e.args[0], 'timed out')
+        # want to exceed the size of the OS buffer so it'll block
+        msg = b"A" * (8 << 20)
+        expect_socket_timeout(client.sendall, msg)
 
         evt.send()
         gt.wait()
@@ -509,6 +482,9 @@ class TestGreenSocket(tests.LimitedTestCase):
             while True:
                 try:
                     sock.sendall(b'hello world')
+                    # Arbitrary delay to not use all available CPU, keeps the test
+                    # running quickly and reliably under a second
+                    time.sleep(0.001)
                 except socket.error as e:
                     if get_errno(e) == errno.EPIPE:
                         return
@@ -524,6 +500,9 @@ class TestGreenSocket(tests.LimitedTestCase):
                 while True:
                     data = client.recv(1024)
                     assert data
+                    # Arbitrary delay to not use all available CPU, keeps the test
+                    # running quickly and reliably under a second
+                    time.sleep(0.001)
             except socket.error as e:
                 # we get an EBADF because client is closed in the same process
                 # (but a different greenthread)
@@ -615,6 +594,21 @@ class TestGreenSocket(tests.LimitedTestCase):
         # should not raise
         greenio.shutdown_safe(sock)
 
+    def test_datagram_socket_operations_work(self):
+        receiver = greenio.GreenSocket(socket.AF_INET, socket.SOCK_DGRAM)
+        receiver.bind(('127.0.0.1', 0))
+        address = receiver.getsockname()
+
+        sender = greenio.GreenSocket(socket.AF_INET, socket.SOCK_DGRAM)
+
+        # Two ways sendto can be called
+        sender.sendto(b'first', address)
+        sender.sendto(b'second', 0, address)
+
+        sender_address = ('127.0.0.1', sender.getsockname()[1])
+        eq_(receiver.recvfrom(1024), (b'first', sender_address))
+        eq_(receiver.recvfrom(1024), (b'second', sender_address))
+
 
 def test_get_fileno_of_a_socket_works():
     class DummySocket(object):
@@ -711,6 +705,36 @@ class TestGreenPipe(tests.LimitedTestCase):
 
         gt.wait()
 
+    def test_pip_read_until_end(self):
+        # similar to test_pip_read above but reading until eof
+        r, w = os.pipe()
+
+        r = greenio.GreenPipe(r, 'rb')
+        w = greenio.GreenPipe(w, 'wb')
+
+        w.write(b'c' * DEFAULT_BUFFER_SIZE * 2)
+        w.close()
+
+        buf = r.read()  # no chunk size specified; read until end
+        self.assertEqual(len(buf), 2 * DEFAULT_BUFFER_SIZE)
+        self.assertEqual(buf[:3], b'ccc')
+
+    def test_pipe_read_unbuffered(self):
+        # Ensure that seting the buffer size works properly on GreenPipes,
+        # it used to be ignored on Python 2 and the test would hang on r.readline()
+        # below.
+        r, w = os.pipe()
+
+        r = greenio.GreenPipe(r, 'rb', 0)
+        w = greenio.GreenPipe(w, 'wb', 0)
+
+        w.write(b'line\n')
+
+        line = r.readline()
+        self.assertEqual(line, b'line\n')
+        r.close()
+        w.close()
+
     def test_pipe_writes_large_messages(self):
         r, w = os.pipe()
 
@@ -769,7 +793,7 @@ class TestGreenIoLong(tests.LimitedTestCase):
     TEST_TIMEOUT = 10  # the test here might take a while depending on the OS
 
     @tests.skip_with_pyevent
-    def test_multiple_readers(self, clibufsize=False):
+    def test_multiple_readers(self):
         debug.hub_prevent_multiple_readers(False)
         recvsize = 2 * min_buf_size()
         sendsize = 10 * recvsize
@@ -809,11 +833,15 @@ class TestGreenIoLong(tests.LimitedTestCase):
         server_coro = eventlet.spawn(server)
         client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
         client.connect(('127.0.0.1', listener.getsockname()[1]))
-        if clibufsize:
-            bufsized(client, size=sendsize)
-        else:
-            bufsized(client)
-        client.sendall(b'*' * sendsize)
+        bufsized(client, size=sendsize)
+
+        # Split into multiple chunks so that we can wait a little
+        # every iteration which allows both readers to queue and
+        # recv some data when we actually send it.
+        for i in range(20):
+            eventlet.sleep(0.001)
+            client.sendall(b'*' * (sendsize // 20))
+
         client.close()
         server_coro.wait()
         listener.close()
@@ -821,110 +849,6 @@ class TestGreenIoLong(tests.LimitedTestCase):
         assert len(results2) > 0
         debug.hub_prevent_multiple_readers()
 
-    @tests.skipped  # by rdw because it fails but it's not clear how to make it pass
-    @tests.skip_with_pyevent
-    def test_multiple_readers2(self):
-        self.test_multiple_readers(clibufsize=True)
-
-
-class TestGreenIoStarvation(tests.LimitedTestCase):
-    # fixme: this doesn't succeed, because of eventlet's predetermined
-    # ordering.  two processes, one with server, one with client eventlets
-    # might be more reliable?
-
-    TEST_TIMEOUT = 300  # the test here might take a while depending on the OS
-
-    @tests.skipped  # by rdw, because it fails but it's not clear how to make it pass
-    @tests.skip_with_pyevent
-    def test_server_starvation(self, sendloops=15):
-        recvsize = 2 * min_buf_size()
-        sendsize = 10000 * recvsize
-
-        results = [[] for i in range(5)]
-
-        listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-        listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-        listener.bind(('127.0.0.1', 0))
-        port = listener.getsockname()[1]
-        listener.listen(50)
-
-        base_time = time.time()
-
-        def server(my_results):
-            sock, addr = listener.accept()
-
-            datasize = 0
-
-            t1 = None
-            t2 = None
-            try:
-                while True:
-                    data = sock.recv(recvsize)
-                    if not t1:
-                        t1 = time.time() - base_time
-                    if not data:
-                        t2 = time.time() - base_time
-                        my_results.append(datasize)
-                        my_results.append((t1, t2))
-                        break
-                    datasize += len(data)
-            finally:
-                sock.close()
-
-        def client():
-            pid = os.fork()
-            if pid:
-                return pid
-
-            client = _orig_sock.socket(socket.AF_INET, socket.SOCK_STREAM)
-            client.connect(('127.0.0.1', port))
-
-            bufsized(client, size=sendsize)
-
-            for i in range(sendloops):
-                client.sendall(b'*' * sendsize)
-            client.close()
-            os._exit(0)
-
-        clients = []
-        servers = []
-        for r in results:
-            servers.append(eventlet.spawn(server, r))
-        for r in results:
-            clients.append(client())
-
-        for s in servers:
-            s.wait()
-        for c in clients:
-            os.waitpid(c, 0)
-
-        listener.close()
-
-        # now test that all of the server receive intervals overlap, and
-        # that there were no errors.
-        for r in results:
-            assert len(r) == 2, "length is %d not 2!: %s\n%s" % (len(r), r, results)
-            assert r[0] == sendsize * sendloops
-            assert len(r[1]) == 2
-            assert r[1][0] is not None
-            assert r[1][1] is not None
-
-        starttimes = sorted(r[1][0] for r in results)
-        endtimes = sorted(r[1][1] for r in results)
-        runlengths = sorted(r[1][1] - r[1][0] for r in results)
-
-        # assert that the last task started before the first task ended
-        # (our no-starvation condition)
-        assert starttimes[-1] < endtimes[0], \
-            "Not overlapping: starts %s ends %s" % (starttimes, endtimes)
-
-        maxstartdiff = starttimes[-1] - starttimes[0]
-
-        assert maxstartdiff * 2 < runlengths[0], \
-            "Largest difference in starting times more than twice the shortest running time!"
-        assert runlengths[0] * 2 > runlengths[-1], \
-            "Longest runtime more than twice as long as shortest!"
-
 
 def test_set_nonblocking():
     sock = _orig_sock.socket(socket.AF_INET, socket.SOCK_DGRAM)
@@ -954,3 +878,76 @@ def test_socket_del_fails_gracefully_when_not_fully_initialized():
 
 def test_double_close_219():
     tests.run_isolated('greenio_double_close_219.py')
+
+
+def test_partial_write_295():
+    # https://github.com/eventlet/eventlet/issues/295
+    # `socket.makefile('w').writelines()` must send all
+    # despite partial writes by underlying socket
+    listen_socket = eventlet.listen(('localhost', 0))
+    original_accept = listen_socket.accept
+
+    def talk(conn):
+        f = conn.makefile('wb')
+        line = b'*' * 2140
+        f.writelines([line] * 10000)
+        conn.close()
+
+    def accept():
+        connection, address = original_accept()
+        original_send = connection.send
+
+        def slow_send(b, *args):
+            b = b[:1031]
+            return original_send(b, *args)
+
+        connection.send = slow_send
+        eventlet.spawn(talk, connection)
+        return connection, address
+
+    listen_socket.accept = accept
+
+    eventlet.spawn(listen_socket.accept)
+    sock = eventlet.connect(listen_socket.getsockname())
+    with eventlet.Timeout(10):
+        bs = sock.makefile('rb').read()
+    assert len(bs) == 21400000
+    assert bs == (b'*' * 21400000)
+
+
+def test_socket_file_read_non_int():
+    listen_socket = eventlet.listen(('localhost', 0))
+
+    def server():
+        conn, _ = listen_socket.accept()
+        conn.recv(1)
+        conn.sendall('response')
+        conn.close()
+
+    eventlet.spawn(server)
+    sock = eventlet.connect(listen_socket.getsockname())
+
+    fd = sock.makefile('rwb')
+    fd.write(b'?')
+    fd.flush()
+    with eventlet.Timeout(1):
+        try:
+            fd.read("This shouldn't work")
+            assert False
+        except TypeError:
+            pass
+
+
+def test_pipe_context():
+    # ensure using a pipe as a context actually closes it.
+    r, w = os.pipe()
+    r = greenio.GreenPipe(r)
+    w = greenio.GreenPipe(w, 'w')
+
+    with r:
+        pass
+    assert r.closed and not w.closed
+
+    with w as f:
+        assert f == w
+    assert r.closed and w.closed