Add python-eventlet package to MOS 9.0 repository
[packages/trusty/python-eventlet.git] / tests / api_test.py
1 import os
2 import socket
3 from unittest import TestCase, main
4 import warnings
5
6 import eventlet
7 from eventlet import greenio, util, hubs, greenthread, spawn
8 from tests import skip_if_no_ssl
9
10 warnings.simplefilter('ignore', DeprecationWarning)
11 from eventlet import api
12 warnings.simplefilter('default', DeprecationWarning)
13
14
15 def check_hub():
16     # Clear through the descriptor queue
17     api.sleep(0)
18     api.sleep(0)
19     hub = hubs.get_hub()
20     for nm in 'get_readers', 'get_writers':
21         dct = getattr(hub, nm)()
22         assert not dct, "hub.%s not empty: %s" % (nm, dct)
23     # Stop the runloop (unless it's twistedhub which does not support that)
24     if not getattr(hub, 'uses_twisted_reactor', None):
25         hub.abort(True)
26         assert not hub.running
27
28
29 class TestApi(TestCase):
30
31     certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt')
32     private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key')
33
34     def test_tcp_listener(self):
35         socket = eventlet.listen(('0.0.0.0', 0))
36         assert socket.getsockname()[0] == '0.0.0.0'
37         socket.close()
38
39         check_hub()
40
41     def test_connect_tcp(self):
42         def accept_once(listenfd):
43             try:
44                 conn, addr = listenfd.accept()
45                 fd = conn.makefile(mode='wb')
46                 conn.close()
47                 fd.write(b'hello\n')
48                 fd.close()
49             finally:
50                 listenfd.close()
51
52         server = eventlet.listen(('0.0.0.0', 0))
53         api.spawn(accept_once, server)
54
55         client = eventlet.connect(('127.0.0.1', server.getsockname()[1]))
56         fd = client.makefile('rb')
57         client.close()
58         assert fd.readline() == b'hello\n'
59
60         assert fd.read() == b''
61         fd.close()
62
63         check_hub()
64
65     @skip_if_no_ssl
66     def test_connect_ssl(self):
67         def accept_once(listenfd):
68             try:
69                 conn, addr = listenfd.accept()
70                 conn.write(b'hello\r\n')
71                 greenio.shutdown_safe(conn)
72                 conn.close()
73             finally:
74                 greenio.shutdown_safe(listenfd)
75                 listenfd.close()
76
77         server = api.ssl_listener(('0.0.0.0', 0),
78                                   self.certificate_file,
79                                   self.private_key_file)
80         api.spawn(accept_once, server)
81
82         raw_client = eventlet.connect(('127.0.0.1', server.getsockname()[1]))
83         client = util.wrap_ssl(raw_client)
84         fd = socket._fileobject(client, 'rb', 8192)
85
86         assert fd.readline() == b'hello\r\n'
87         try:
88             self.assertEqual(b'', fd.read(10))
89         except greenio.SSL.ZeroReturnError:
90             # if it's a GreenSSL object it'll do this
91             pass
92         greenio.shutdown_safe(client)
93         client.close()
94
95         check_hub()
96
97     def test_001_trampoline_timeout(self):
98         server_sock = eventlet.listen(('127.0.0.1', 0))
99         bound_port = server_sock.getsockname()[1]
100
101         def server(sock):
102             client, addr = sock.accept()
103             api.sleep(0.1)
104         server_evt = spawn(server, server_sock)
105         api.sleep(0)
106         try:
107             desc = eventlet.connect(('127.0.0.1', bound_port))
108             api.trampoline(desc, read=True, write=False, timeout=0.001)
109         except api.TimeoutError:
110             pass  # test passed
111         else:
112             assert False, "Didn't timeout"
113
114         server_evt.wait()
115         check_hub()
116
117     def test_timeout_cancel(self):
118         server = eventlet.listen(('0.0.0.0', 0))
119         bound_port = server.getsockname()[1]
120
121         done = [False]
122
123         def client_closer(sock):
124             while True:
125                 (conn, addr) = sock.accept()
126                 conn.close()
127
128         def go():
129             desc = eventlet.connect(('127.0.0.1', bound_port))
130             try:
131                 api.trampoline(desc, read=True, timeout=0.1)
132             except api.TimeoutError:
133                 assert False, "Timed out"
134
135             server.close()
136             desc.close()
137             done[0] = True
138
139         greenthread.spawn_after_local(0, go)
140
141         server_coro = api.spawn(client_closer, server)
142         while not done[0]:
143             api.sleep(0)
144         api.kill(server_coro)
145
146         check_hub()
147
148     def test_named(self):
149         named_foo = api.named('tests.api_test.Foo')
150         self.assertEqual(named_foo.__name__, "Foo")
151
152     def test_naming_missing_class(self):
153         self.assertRaises(
154             ImportError, api.named, 'this_name_should_hopefully_not_exist.Foo')
155
156     def test_killing_dormant(self):
157         DELAY = 0.1
158         state = []
159
160         def test():
161             try:
162                 state.append('start')
163                 api.sleep(DELAY)
164             except:
165                 state.append('except')
166                 # catching GreenletExit
167                 pass
168             # when switching to hub, hub makes itself the parent of this greenlet,
169             # thus after the function's done, the control will go to the parent
170             api.sleep(0)
171             state.append('finished')
172
173         g = api.spawn(test)
174         api.sleep(DELAY / 2)
175         self.assertEqual(state, ['start'])
176         api.kill(g)
177         # will not get there, unless switching is explicitly scheduled by kill
178         self.assertEqual(state, ['start', 'except'])
179         api.sleep(DELAY)
180         self.assertEqual(state, ['start', 'except', 'finished'])
181
182     def test_nested_with_timeout(self):
183         def func():
184             return api.with_timeout(0.2, api.sleep, 2, timeout_value=1)
185
186         try:
187             api.with_timeout(0.1, func)
188             self.fail(u'Expected api.TimeoutError')
189         except api.TimeoutError:
190             pass
191
192
193 class Foo(object):
194     pass
195
196
197 if __name__ == '__main__':
198     main()