1 from eventlet import patcher
2 from eventlet.green import asyncore
3 from eventlet.green import select
4 from eventlet.green import socket
5 from eventlet.green import threading
6 from eventlet.green import time
8 patcher.inject("test.test_asyncore", globals())
11 def new_closeall_check(self, usedefault):
12 # Check that close_all() closes everything in a given map
19 self.assertEqual(c.socket.closed, False)
23 # the only change we make is to not assign to asyncore.socket_map
24 # because doing so fails to assign to the real asyncore's socket_map
25 # and thus the test fails
26 socketmap = asyncore.socket_map.copy()
28 asyncore.socket_map.clear()
29 asyncore.socket_map.update(testmap)
32 testmap = asyncore.socket_map.copy()
33 asyncore.socket_map.clear()
34 asyncore.socket_map.update(socketmap)
36 asyncore.close_all(testmap)
38 self.assertEqual(len(testmap), 0)
41 self.assertEqual(c.socket.closed, True)
43 HelperFunctionTests.closeall_check = new_closeall_check
46 # Eventlet's select() emulation doesn't support the POLLPRI flag,
47 # which this test relies on. Therefore, nuke it!
48 BaseTestAPI.test_handle_expt = lambda *a, **kw: None
53 # temporarily disabling these tests in the python2.7/pyevent configuration
54 from tests import using_pyevent
56 if using_pyevent(None) and sys.version_info >= (2, 7):
57 TestAPI_UseSelect.test_handle_accept = lambda *a, **kw: None
58 TestAPI_UseSelect.test_handle_close = lambda *a, **kw: None
59 TestAPI_UseSelect.test_handle_read = lambda *a, **kw: None
63 if __name__ == "__main__":