2 from unittest import TestCase, main
4 from nose.tools import eq_
7 from eventlet import greenio, hubs, greenthread, spawn
8 from eventlet.green import ssl
9 from tests import skip_if_no_ssl
13 # Clear through the descriptor queue
17 for nm in 'get_readers', 'get_writers':
18 dct = getattr(hub, nm)()
19 assert not dct, "hub.%s not empty: %s" % (nm, dct)
21 assert not hub.running
24 class TestApi(TestCase):
26 certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt')
27 private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key')
29 def test_tcp_listener(self):
30 socket = eventlet.listen(('0.0.0.0', 0))
31 assert socket.getsockname()[0] == '0.0.0.0'
36 def test_connect_tcp(self):
37 def accept_once(listenfd):
39 conn, addr = listenfd.accept()
40 fd = conn.makefile(mode='wb')
47 server = eventlet.listen(('0.0.0.0', 0))
48 eventlet.spawn_n(accept_once, server)
50 client = eventlet.connect(('127.0.0.1', server.getsockname()[1]))
51 fd = client.makefile('rb')
53 eq_(fd.readline(), b'hello\n')
60 def test_connect_ssl(self):
61 def accept_once(listenfd):
63 conn, addr = listenfd.accept()
64 conn.write(b'hello\r\n')
65 greenio.shutdown_safe(conn)
68 greenio.shutdown_safe(listenfd)
71 server = eventlet.wrap_ssl(
72 eventlet.listen(('0.0.0.0', 0)),
73 self.private_key_file,
74 self.certificate_file,
77 eventlet.spawn_n(accept_once, server)
79 raw_client = eventlet.connect(('127.0.0.1', server.getsockname()[1]))
80 client = ssl.wrap_socket(raw_client)
81 fd = client.makefile('rb', 8192)
83 assert fd.readline() == b'hello\r\n'
85 self.assertEqual(b'', fd.read(10))
86 except greenio.SSL.ZeroReturnError:
87 # if it's a GreenSSL object it'll do this
89 greenio.shutdown_safe(client)
94 def test_001_trampoline_timeout(self):
95 server_sock = eventlet.listen(('127.0.0.1', 0))
96 bound_port = server_sock.getsockname()[1]
99 client, addr = sock.accept()
101 server_evt = spawn(server, server_sock)
104 desc = eventlet.connect(('127.0.0.1', bound_port))
105 hubs.trampoline(desc, read=True, write=False, timeout=0.001)
106 except eventlet.TimeoutError:
109 assert False, "Didn't timeout"
114 def test_timeout_cancel(self):
115 server = eventlet.listen(('0.0.0.0', 0))
116 bound_port = server.getsockname()[1]
120 def client_closer(sock):
122 (conn, addr) = sock.accept()
126 desc = eventlet.connect(('127.0.0.1', bound_port))
128 hubs.trampoline(desc, read=True, timeout=0.1)
129 except eventlet.TimeoutError:
130 assert False, "Timed out"
136 greenthread.spawn_after_local(0, go)
138 server_coro = eventlet.spawn(client_closer, server)
141 eventlet.kill(server_coro)
145 def test_killing_dormant(self):
151 state.append('start')
152 eventlet.sleep(DELAY)
154 state.append('except')
155 # catching GreenletExit
157 # when switching to hub, hub makes itself the parent of this greenlet,
158 # thus after the function's done, the control will go to the parent
160 state.append('finished')
162 g = eventlet.spawn(test)
163 eventlet.sleep(DELAY / 2)
164 self.assertEqual(state, ['start'])
166 # will not get there, unless switching is explicitly scheduled by kill
167 self.assertEqual(state, ['start', 'except'])
168 eventlet.sleep(DELAY)
169 self.assertEqual(state, ['start', 'except', 'finished'])
171 def test_nested_with_timeout(self):
173 return eventlet.with_timeout(0.2, eventlet.sleep, 2, timeout_value=1)
176 eventlet.with_timeout(0.1, func)
177 self.fail(u'Expected TimeoutError')
178 except eventlet.TimeoutError:
186 if __name__ == '__main__':