Add python-eventlet 0.16.1
[packages/trusty/python-eventlet.git] / eventlet / tests / api_test.py
1 import os
2 from unittest import TestCase, main
3
4 from nose.tools import eq_
5
6 import eventlet
7 from eventlet import greenio, hubs, greenthread, spawn
8 from eventlet.green import ssl
9 from tests import skip_if_no_ssl
10
11
12 def check_hub():
13     # Clear through the descriptor queue
14     eventlet.sleep(0)
15     eventlet.sleep(0)
16     hub = hubs.get_hub()
17     for nm in 'get_readers', 'get_writers':
18         dct = getattr(hub, nm)()
19         assert not dct, "hub.%s not empty: %s" % (nm, dct)
20     hub.abort(True)
21     assert not hub.running
22
23
24 class TestApi(TestCase):
25
26     certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt')
27     private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key')
28
29     def test_tcp_listener(self):
30         socket = eventlet.listen(('0.0.0.0', 0))
31         assert socket.getsockname()[0] == '0.0.0.0'
32         socket.close()
33
34         check_hub()
35
36     def test_connect_tcp(self):
37         def accept_once(listenfd):
38             try:
39                 conn, addr = listenfd.accept()
40                 fd = conn.makefile(mode='wb')
41                 conn.close()
42                 fd.write(b'hello\n')
43                 fd.close()
44             finally:
45                 listenfd.close()
46
47         server = eventlet.listen(('0.0.0.0', 0))
48         eventlet.spawn_n(accept_once, server)
49
50         client = eventlet.connect(('127.0.0.1', server.getsockname()[1]))
51         fd = client.makefile('rb')
52         client.close()
53         eq_(fd.readline(), b'hello\n')
54         eq_(fd.read(), b'')
55         fd.close()
56
57         check_hub()
58
59     @skip_if_no_ssl
60     def test_connect_ssl(self):
61         def accept_once(listenfd):
62             try:
63                 conn, addr = listenfd.accept()
64                 conn.write(b'hello\r\n')
65                 greenio.shutdown_safe(conn)
66                 conn.close()
67             finally:
68                 greenio.shutdown_safe(listenfd)
69                 listenfd.close()
70
71         server = eventlet.wrap_ssl(
72             eventlet.listen(('0.0.0.0', 0)),
73             self.private_key_file,
74             self.certificate_file,
75             server_side=True
76         )
77         eventlet.spawn_n(accept_once, server)
78
79         raw_client = eventlet.connect(('127.0.0.1', server.getsockname()[1]))
80         client = ssl.wrap_socket(raw_client)
81         fd = client.makefile('rb', 8192)
82
83         assert fd.readline() == b'hello\r\n'
84         try:
85             self.assertEqual(b'', fd.read(10))
86         except greenio.SSL.ZeroReturnError:
87             # if it's a GreenSSL object it'll do this
88             pass
89         greenio.shutdown_safe(client)
90         client.close()
91
92         check_hub()
93
94     def test_001_trampoline_timeout(self):
95         server_sock = eventlet.listen(('127.0.0.1', 0))
96         bound_port = server_sock.getsockname()[1]
97
98         def server(sock):
99             client, addr = sock.accept()
100             eventlet.sleep(0.1)
101         server_evt = spawn(server, server_sock)
102         eventlet.sleep(0)
103         try:
104             desc = eventlet.connect(('127.0.0.1', bound_port))
105             hubs.trampoline(desc, read=True, write=False, timeout=0.001)
106         except eventlet.TimeoutError:
107             pass  # test passed
108         else:
109             assert False, "Didn't timeout"
110
111         server_evt.wait()
112         check_hub()
113
114     def test_timeout_cancel(self):
115         server = eventlet.listen(('0.0.0.0', 0))
116         bound_port = server.getsockname()[1]
117
118         done = [False]
119
120         def client_closer(sock):
121             while True:
122                 (conn, addr) = sock.accept()
123                 conn.close()
124
125         def go():
126             desc = eventlet.connect(('127.0.0.1', bound_port))
127             try:
128                 hubs.trampoline(desc, read=True, timeout=0.1)
129             except eventlet.TimeoutError:
130                 assert False, "Timed out"
131
132             server.close()
133             desc.close()
134             done[0] = True
135
136         greenthread.spawn_after_local(0, go)
137
138         server_coro = eventlet.spawn(client_closer, server)
139         while not done[0]:
140             eventlet.sleep(0)
141         eventlet.kill(server_coro)
142
143         check_hub()
144
145     def test_killing_dormant(self):
146         DELAY = 0.1
147         state = []
148
149         def test():
150             try:
151                 state.append('start')
152                 eventlet.sleep(DELAY)
153             except:
154                 state.append('except')
155                 # catching GreenletExit
156                 pass
157             # when switching to hub, hub makes itself the parent of this greenlet,
158             # thus after the function's done, the control will go to the parent
159             eventlet.sleep(0)
160             state.append('finished')
161
162         g = eventlet.spawn(test)
163         eventlet.sleep(DELAY / 2)
164         self.assertEqual(state, ['start'])
165         eventlet.kill(g)
166         # will not get there, unless switching is explicitly scheduled by kill
167         self.assertEqual(state, ['start', 'except'])
168         eventlet.sleep(DELAY)
169         self.assertEqual(state, ['start', 'except', 'finished'])
170
171     def test_nested_with_timeout(self):
172         def func():
173             return eventlet.with_timeout(0.2, eventlet.sleep, 2, timeout_value=1)
174
175         try:
176             eventlet.with_timeout(0.1, func)
177             self.fail(u'Expected TimeoutError')
178         except eventlet.TimeoutError:
179             pass
180
181
182 class Foo(object):
183     pass
184
185
186 if __name__ == '__main__':
187     main()