import os import socket from unittest import TestCase, main import warnings import eventlet from eventlet import greenio, util, hubs, greenthread, spawn from tests import skip_if_no_ssl warnings.simplefilter('ignore', DeprecationWarning) from eventlet import api warnings.simplefilter('default', DeprecationWarning) def check_hub(): # Clear through the descriptor queue api.sleep(0) api.sleep(0) hub = hubs.get_hub() for nm in 'get_readers', 'get_writers': dct = getattr(hub, nm)() assert not dct, "hub.%s not empty: %s" % (nm, dct) # Stop the runloop (unless it's twistedhub which does not support that) if not getattr(hub, 'uses_twisted_reactor', None): hub.abort(True) assert not hub.running class TestApi(TestCase): certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt') private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key') def test_tcp_listener(self): socket = eventlet.listen(('0.0.0.0', 0)) assert socket.getsockname()[0] == '0.0.0.0' socket.close() check_hub() def test_connect_tcp(self): def accept_once(listenfd): try: conn, addr = listenfd.accept() fd = conn.makefile(mode='wb') conn.close() fd.write(b'hello\n') fd.close() finally: listenfd.close() server = eventlet.listen(('0.0.0.0', 0)) api.spawn(accept_once, server) client = eventlet.connect(('127.0.0.1', server.getsockname()[1])) fd = client.makefile('rb') client.close() assert fd.readline() == b'hello\n' assert fd.read() == b'' fd.close() check_hub() @skip_if_no_ssl def test_connect_ssl(self): def accept_once(listenfd): try: conn, addr = listenfd.accept() conn.write(b'hello\r\n') greenio.shutdown_safe(conn) conn.close() finally: greenio.shutdown_safe(listenfd) listenfd.close() server = api.ssl_listener(('0.0.0.0', 0), self.certificate_file, self.private_key_file) api.spawn(accept_once, server) raw_client = eventlet.connect(('127.0.0.1', server.getsockname()[1])) client = util.wrap_ssl(raw_client) fd = socket._fileobject(client, 'rb', 8192) assert fd.readline() == b'hello\r\n' try: self.assertEqual(b'', fd.read(10)) except greenio.SSL.ZeroReturnError: # if it's a GreenSSL object it'll do this pass greenio.shutdown_safe(client) client.close() check_hub() def test_001_trampoline_timeout(self): server_sock = eventlet.listen(('127.0.0.1', 0)) bound_port = server_sock.getsockname()[1] def server(sock): client, addr = sock.accept() api.sleep(0.1) server_evt = spawn(server, server_sock) api.sleep(0) try: desc = eventlet.connect(('127.0.0.1', bound_port)) api.trampoline(desc, read=True, write=False, timeout=0.001) except api.TimeoutError: pass # test passed else: assert False, "Didn't timeout" server_evt.wait() check_hub() def test_timeout_cancel(self): server = eventlet.listen(('0.0.0.0', 0)) bound_port = server.getsockname()[1] done = [False] def client_closer(sock): while True: (conn, addr) = sock.accept() conn.close() def go(): desc = eventlet.connect(('127.0.0.1', bound_port)) try: api.trampoline(desc, read=True, timeout=0.1) except api.TimeoutError: assert False, "Timed out" server.close() desc.close() done[0] = True greenthread.spawn_after_local(0, go) server_coro = api.spawn(client_closer, server) while not done[0]: api.sleep(0) api.kill(server_coro) check_hub() def test_named(self): named_foo = api.named('tests.api_test.Foo') self.assertEqual(named_foo.__name__, "Foo") def test_naming_missing_class(self): self.assertRaises( ImportError, api.named, 'this_name_should_hopefully_not_exist.Foo') def test_killing_dormant(self): DELAY = 0.1 state = [] def test(): try: state.append('start') api.sleep(DELAY) except: state.append('except') # catching GreenletExit pass # when switching to hub, hub makes itself the parent of this greenlet, # thus after the function's done, the control will go to the parent api.sleep(0) state.append('finished') g = api.spawn(test) api.sleep(DELAY / 2) self.assertEqual(state, ['start']) api.kill(g) # will not get there, unless switching is explicitly scheduled by kill self.assertEqual(state, ['start', 'except']) api.sleep(DELAY) self.assertEqual(state, ['start', 'except', 'finished']) def test_nested_with_timeout(self): def func(): return api.with_timeout(0.2, api.sleep, 2, timeout_value=1) try: api.with_timeout(0.1, func) self.fail(u'Expected api.TimeoutError') except api.TimeoutError: pass class Foo(object): pass if __name__ == '__main__': main()