import eventlet
import fcntl
import gc
+from io import DEFAULT_BUFFER_SIZE
import os
import shutil
import socket as _orig_sock
from nose.tools import eq_
+import eventlet
from eventlet import event, greenio, debug
from eventlet.hubs import get_hub
from eventlet.green import select, socket, time, ssl
import tests
-if six.PY3:
- buffer = memoryview
-
-
def bufsized(sock, size=1):
""" Resize both send and receive buffers on a socket.
Useful for testing trampoline. Returns the socket.
return sock
+def expect_socket_timeout(function, *args):
+ try:
+ function(*args)
+ raise AssertionError("socket.timeout not raised")
+ except socket.timeout as e:
+ assert hasattr(e, 'args')
+ eq_(e.args[0], 'timed out')
+
+
def min_buf_size():
"""Return the minimum buffer size that the platform supports."""
test_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(0.1)
gs = greenio.GreenSocket(s)
+
try:
- gs.connect(('192.0.2.1', 80))
- self.fail("socket.timeout not raised")
- except socket.timeout as e:
- assert hasattr(e, 'args')
- self.assertEqual(e.args[0], 'timed out')
+ expect_socket_timeout(gs.connect, ('192.0.2.1', 80))
except socket.error as e:
# unreachable is also a valid outcome
if not get_errno(e) in (errno.EHOSTUNREACH, errno.ENETUNREACH):
s.settimeout(0.1)
gs = greenio.GreenSocket(s)
- try:
- gs.accept()
- self.fail("socket.timeout not raised")
- except socket.timeout as e:
- assert hasattr(e, 'args')
- self.assertEqual(e.args[0], 'timed out')
+ expect_socket_timeout(gs.accept)
def test_connect_ex_timeout(self):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(addr)
- try:
- client.recv(8192)
- self.fail("socket.timeout not raised")
- except socket.timeout as e:
- assert hasattr(e, 'args')
- self.assertEqual(e.args[0], 'timed out')
+ expect_socket_timeout(client.recv, 0)
+ expect_socket_timeout(client.recv, 8192)
evt.send()
gt.wait()
gs.settimeout(.1)
gs.bind(('', 0))
- try:
- gs.recvfrom(8192)
- self.fail("socket.timeout not raised")
- except socket.timeout as e:
- assert hasattr(e, 'args')
- self.assertEqual(e.args[0], 'timed out')
+ expect_socket_timeout(gs.recvfrom, 0)
+ expect_socket_timeout(gs.recvfrom, 8192)
def test_recvfrom_into_timeout(self):
buf = array.array('B')
gs.settimeout(.1)
gs.bind(('', 0))
- try:
- gs.recvfrom_into(buf)
- self.fail("socket.timeout not raised")
- except socket.timeout as e:
- assert hasattr(e, 'args')
- self.assertEqual(e.args[0], 'timed out')
+ expect_socket_timeout(gs.recvfrom_into, buf)
def test_recv_into_timeout(self):
buf = array.array('B')
client.connect(addr)
- try:
- client.recv_into(buf)
- self.fail("socket.timeout not raised")
- except socket.timeout as e:
- assert hasattr(e, 'args')
- self.assertEqual(e.args[0], 'timed out')
+ expect_socket_timeout(client.recv_into, buf)
evt.send()
gt.wait()
client = bufsized(greenio.GreenSocket(socket.socket()))
client.connect(addr)
- try:
- client.settimeout(0.00001)
- msg = b"A" * 100000 # large enough number to overwhelm most buffers
- total_sent = 0
- # want to exceed the size of the OS buffer so it'll block in a
- # single send
+ client.settimeout(0.00001)
+ msg = b"A" * 100000 # large enough number to overwhelm most buffers
+
+ # want to exceed the size of the OS buffer so it'll block in a
+ # single send
+ def send():
for x in range(10):
- total_sent += client.send(msg)
- self.fail("socket.timeout not raised")
- except socket.timeout as e:
- assert hasattr(e, 'args')
- self.assertEqual(e.args[0], 'timed out')
+ client.send(msg)
+
+ expect_socket_timeout(send)
evt.send()
gt.wait()
client.settimeout(0.1)
client.connect(addr)
- try:
- msg = b"A" * (8 << 20)
-
- # want to exceed the size of the OS buffer so it'll block
- client.sendall(msg)
- self.fail("socket.timeout not raised")
- except socket.timeout as e:
- assert hasattr(e, 'args')
- self.assertEqual(e.args[0], 'timed out')
+ # want to exceed the size of the OS buffer so it'll block
+ msg = b"A" * (8 << 20)
+ expect_socket_timeout(client.sendall, msg)
evt.send()
gt.wait()
while True:
try:
sock.sendall(b'hello world')
+ # Arbitrary delay to not use all available CPU, keeps the test
+ # running quickly and reliably under a second
+ time.sleep(0.001)
except socket.error as e:
if get_errno(e) == errno.EPIPE:
return
while True:
data = client.recv(1024)
assert data
+ # Arbitrary delay to not use all available CPU, keeps the test
+ # running quickly and reliably under a second
+ time.sleep(0.001)
except socket.error as e:
# we get an EBADF because client is closed in the same process
# (but a different greenthread)
# should not raise
greenio.shutdown_safe(sock)
+ def test_datagram_socket_operations_work(self):
+ receiver = greenio.GreenSocket(socket.AF_INET, socket.SOCK_DGRAM)
+ receiver.bind(('127.0.0.1', 0))
+ address = receiver.getsockname()
+
+ sender = greenio.GreenSocket(socket.AF_INET, socket.SOCK_DGRAM)
+
+ # Two ways sendto can be called
+ sender.sendto(b'first', address)
+ sender.sendto(b'second', 0, address)
+
+ sender_address = ('127.0.0.1', sender.getsockname()[1])
+ eq_(receiver.recvfrom(1024), (b'first', sender_address))
+ eq_(receiver.recvfrom(1024), (b'second', sender_address))
+
def test_get_fileno_of_a_socket_works():
class DummySocket(object):
gt.wait()
+ def test_pip_read_until_end(self):
+ # similar to test_pip_read above but reading until eof
+ r, w = os.pipe()
+
+ r = greenio.GreenPipe(r, 'rb')
+ w = greenio.GreenPipe(w, 'wb')
+
+ w.write(b'c' * DEFAULT_BUFFER_SIZE * 2)
+ w.close()
+
+ buf = r.read() # no chunk size specified; read until end
+ self.assertEqual(len(buf), 2 * DEFAULT_BUFFER_SIZE)
+ self.assertEqual(buf[:3], b'ccc')
+
+ def test_pipe_read_unbuffered(self):
+ # Ensure that seting the buffer size works properly on GreenPipes,
+ # it used to be ignored on Python 2 and the test would hang on r.readline()
+ # below.
+ r, w = os.pipe()
+
+ r = greenio.GreenPipe(r, 'rb', 0)
+ w = greenio.GreenPipe(w, 'wb', 0)
+
+ w.write(b'line\n')
+
+ line = r.readline()
+ self.assertEqual(line, b'line\n')
+ r.close()
+ w.close()
+
def test_pipe_writes_large_messages(self):
r, w = os.pipe()
TEST_TIMEOUT = 10 # the test here might take a while depending on the OS
@tests.skip_with_pyevent
- def test_multiple_readers(self, clibufsize=False):
+ def test_multiple_readers(self):
debug.hub_prevent_multiple_readers(False)
recvsize = 2 * min_buf_size()
sendsize = 10 * recvsize
server_coro = eventlet.spawn(server)
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('127.0.0.1', listener.getsockname()[1]))
- if clibufsize:
- bufsized(client, size=sendsize)
- else:
- bufsized(client)
- client.sendall(b'*' * sendsize)
+ bufsized(client, size=sendsize)
+
+ # Split into multiple chunks so that we can wait a little
+ # every iteration which allows both readers to queue and
+ # recv some data when we actually send it.
+ for i in range(20):
+ eventlet.sleep(0.001)
+ client.sendall(b'*' * (sendsize // 20))
+
client.close()
server_coro.wait()
listener.close()
assert len(results2) > 0
debug.hub_prevent_multiple_readers()
- @tests.skipped # by rdw because it fails but it's not clear how to make it pass
- @tests.skip_with_pyevent
- def test_multiple_readers2(self):
- self.test_multiple_readers(clibufsize=True)
-
-
-class TestGreenIoStarvation(tests.LimitedTestCase):
- # fixme: this doesn't succeed, because of eventlet's predetermined
- # ordering. two processes, one with server, one with client eventlets
- # might be more reliable?
-
- TEST_TIMEOUT = 300 # the test here might take a while depending on the OS
-
- @tests.skipped # by rdw, because it fails but it's not clear how to make it pass
- @tests.skip_with_pyevent
- def test_server_starvation(self, sendloops=15):
- recvsize = 2 * min_buf_size()
- sendsize = 10000 * recvsize
-
- results = [[] for i in range(5)]
-
- listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- listener.bind(('127.0.0.1', 0))
- port = listener.getsockname()[1]
- listener.listen(50)
-
- base_time = time.time()
-
- def server(my_results):
- sock, addr = listener.accept()
-
- datasize = 0
-
- t1 = None
- t2 = None
- try:
- while True:
- data = sock.recv(recvsize)
- if not t1:
- t1 = time.time() - base_time
- if not data:
- t2 = time.time() - base_time
- my_results.append(datasize)
- my_results.append((t1, t2))
- break
- datasize += len(data)
- finally:
- sock.close()
-
- def client():
- pid = os.fork()
- if pid:
- return pid
-
- client = _orig_sock.socket(socket.AF_INET, socket.SOCK_STREAM)
- client.connect(('127.0.0.1', port))
-
- bufsized(client, size=sendsize)
-
- for i in range(sendloops):
- client.sendall(b'*' * sendsize)
- client.close()
- os._exit(0)
-
- clients = []
- servers = []
- for r in results:
- servers.append(eventlet.spawn(server, r))
- for r in results:
- clients.append(client())
-
- for s in servers:
- s.wait()
- for c in clients:
- os.waitpid(c, 0)
-
- listener.close()
-
- # now test that all of the server receive intervals overlap, and
- # that there were no errors.
- for r in results:
- assert len(r) == 2, "length is %d not 2!: %s\n%s" % (len(r), r, results)
- assert r[0] == sendsize * sendloops
- assert len(r[1]) == 2
- assert r[1][0] is not None
- assert r[1][1] is not None
-
- starttimes = sorted(r[1][0] for r in results)
- endtimes = sorted(r[1][1] for r in results)
- runlengths = sorted(r[1][1] - r[1][0] for r in results)
-
- # assert that the last task started before the first task ended
- # (our no-starvation condition)
- assert starttimes[-1] < endtimes[0], \
- "Not overlapping: starts %s ends %s" % (starttimes, endtimes)
-
- maxstartdiff = starttimes[-1] - starttimes[0]
-
- assert maxstartdiff * 2 < runlengths[0], \
- "Largest difference in starting times more than twice the shortest running time!"
- assert runlengths[0] * 2 > runlengths[-1], \
- "Longest runtime more than twice as long as shortest!"
-
def test_set_nonblocking():
sock = _orig_sock.socket(socket.AF_INET, socket.SOCK_DGRAM)
def test_double_close_219():
tests.run_isolated('greenio_double_close_219.py')
+
+
+def test_partial_write_295():
+ # https://github.com/eventlet/eventlet/issues/295
+ # `socket.makefile('w').writelines()` must send all
+ # despite partial writes by underlying socket
+ listen_socket = eventlet.listen(('localhost', 0))
+ original_accept = listen_socket.accept
+
+ def talk(conn):
+ f = conn.makefile('wb')
+ line = b'*' * 2140
+ f.writelines([line] * 10000)
+ conn.close()
+
+ def accept():
+ connection, address = original_accept()
+ original_send = connection.send
+
+ def slow_send(b, *args):
+ b = b[:1031]
+ return original_send(b, *args)
+
+ connection.send = slow_send
+ eventlet.spawn(talk, connection)
+ return connection, address
+
+ listen_socket.accept = accept
+
+ eventlet.spawn(listen_socket.accept)
+ sock = eventlet.connect(listen_socket.getsockname())
+ with eventlet.Timeout(10):
+ bs = sock.makefile('rb').read()
+ assert len(bs) == 21400000
+ assert bs == (b'*' * 21400000)
+
+
+def test_socket_file_read_non_int():
+ listen_socket = eventlet.listen(('localhost', 0))
+
+ def server():
+ conn, _ = listen_socket.accept()
+ conn.recv(1)
+ conn.sendall('response')
+ conn.close()
+
+ eventlet.spawn(server)
+ sock = eventlet.connect(listen_socket.getsockname())
+
+ fd = sock.makefile('rwb')
+ fd.write(b'?')
+ fd.flush()
+ with eventlet.Timeout(1):
+ try:
+ fd.read("This shouldn't work")
+ assert False
+ except TypeError:
+ pass
+
+
+def test_pipe_context():
+ # ensure using a pipe as a context actually closes it.
+ r, w = os.pipe()
+ r = greenio.GreenPipe(r)
+ w = greenio.GreenPipe(w, 'w')
+
+ with r:
+ pass
+ assert r.closed and not w.closed
+
+ with w as f:
+ assert f == w
+ assert r.closed and w.closed