3 from unittest import TestCase, main
7 from eventlet import greenio, util, hubs, greenthread, spawn
8 from tests import skip_if_no_ssl
10 warnings.simplefilter('ignore', DeprecationWarning)
11 from eventlet import api
12 warnings.simplefilter('default', DeprecationWarning)
16 # Clear through the descriptor queue
20 for nm in 'get_readers', 'get_writers':
21 dct = getattr(hub, nm)()
22 assert not dct, "hub.%s not empty: %s" % (nm, dct)
23 # Stop the runloop (unless it's twistedhub which does not support that)
24 if not getattr(hub, 'uses_twisted_reactor', None):
26 assert not hub.running
29 class TestApi(TestCase):
31 certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt')
32 private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key')
34 def test_tcp_listener(self):
35 socket = eventlet.listen(('0.0.0.0', 0))
36 assert socket.getsockname()[0] == '0.0.0.0'
41 def test_connect_tcp(self):
42 def accept_once(listenfd):
44 conn, addr = listenfd.accept()
45 fd = conn.makefile(mode='wb')
52 server = eventlet.listen(('0.0.0.0', 0))
53 api.spawn(accept_once, server)
55 client = eventlet.connect(('127.0.0.1', server.getsockname()[1]))
56 fd = client.makefile('rb')
58 assert fd.readline() == b'hello\n'
60 assert fd.read() == b''
66 def test_connect_ssl(self):
67 def accept_once(listenfd):
69 conn, addr = listenfd.accept()
70 conn.write(b'hello\r\n')
71 greenio.shutdown_safe(conn)
74 greenio.shutdown_safe(listenfd)
77 server = api.ssl_listener(('0.0.0.0', 0),
78 self.certificate_file,
79 self.private_key_file)
80 api.spawn(accept_once, server)
82 raw_client = eventlet.connect(('127.0.0.1', server.getsockname()[1]))
83 client = util.wrap_ssl(raw_client)
84 fd = socket._fileobject(client, 'rb', 8192)
86 assert fd.readline() == b'hello\r\n'
88 self.assertEqual(b'', fd.read(10))
89 except greenio.SSL.ZeroReturnError:
90 # if it's a GreenSSL object it'll do this
92 greenio.shutdown_safe(client)
97 def test_001_trampoline_timeout(self):
98 server_sock = eventlet.listen(('127.0.0.1', 0))
99 bound_port = server_sock.getsockname()[1]
102 client, addr = sock.accept()
104 server_evt = spawn(server, server_sock)
107 desc = eventlet.connect(('127.0.0.1', bound_port))
108 api.trampoline(desc, read=True, write=False, timeout=0.001)
109 except api.TimeoutError:
112 assert False, "Didn't timeout"
117 def test_timeout_cancel(self):
118 server = eventlet.listen(('0.0.0.0', 0))
119 bound_port = server.getsockname()[1]
123 def client_closer(sock):
125 (conn, addr) = sock.accept()
129 desc = eventlet.connect(('127.0.0.1', bound_port))
131 api.trampoline(desc, read=True, timeout=0.1)
132 except api.TimeoutError:
133 assert False, "Timed out"
139 greenthread.spawn_after_local(0, go)
141 server_coro = api.spawn(client_closer, server)
144 api.kill(server_coro)
148 def test_named(self):
149 named_foo = api.named('tests.api_test.Foo')
150 self.assertEqual(named_foo.__name__, "Foo")
152 def test_naming_missing_class(self):
154 ImportError, api.named, 'this_name_should_hopefully_not_exist.Foo')
156 def test_killing_dormant(self):
162 state.append('start')
165 state.append('except')
166 # catching GreenletExit
168 # when switching to hub, hub makes itself the parent of this greenlet,
169 # thus after the function's done, the control will go to the parent
171 state.append('finished')
175 self.assertEqual(state, ['start'])
177 # will not get there, unless switching is explicitly scheduled by kill
178 self.assertEqual(state, ['start', 'except'])
180 self.assertEqual(state, ['start', 'except', 'finished'])
182 def test_nested_with_timeout(self):
184 return api.with_timeout(0.2, api.sleep, 2, timeout_value=1)
187 api.with_timeout(0.1, func)
188 self.fail(u'Expected api.TimeoutError')
189 except api.TimeoutError:
197 if __name__ == '__main__':