4 from eventlet import debug, event
5 from eventlet.green import socket
6 from eventlet.support import six
7 from tests import LimitedTestCase, skip_if_no_ssl
10 certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt')
11 private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key')
14 class TestServe(LimitedTestCase):
16 super(TestServe, self).setUp()
17 debug.hub_exceptions(False)
20 super(TestServe, self).tearDown()
21 debug.hub_exceptions(True)
23 def test_exiting_server(self):
24 # tests that the server closes the client sock on handle() exit
25 def closer(sock, addr):
28 l = eventlet.listen(('localhost', 0))
29 gt = eventlet.spawn(eventlet.serve, l, closer)
30 client = eventlet.connect(('localhost', l.getsockname()[1]))
32 self.assertFalse(client.recv(100))
35 def test_excepting_server(self):
36 # tests that the server closes the client sock on handle() exception
37 def crasher(sock, addr):
41 l = eventlet.listen(('localhost', 0))
42 gt = eventlet.spawn(eventlet.serve, l, crasher)
43 client = eventlet.connect(('localhost', l.getsockname()[1]))
45 self.assertRaises(ZeroDivisionError, gt.wait)
46 self.assertFalse(client.recv(100))
48 def test_excepting_server_already_closed(self):
49 # same as above but with explicit clsoe before crash
50 def crasher(sock, addr):
55 l = eventlet.listen(('localhost', 0))
56 gt = eventlet.spawn(eventlet.serve, l, crasher)
57 client = eventlet.connect(('localhost', l.getsockname()[1]))
59 self.assertRaises(ZeroDivisionError, gt.wait)
60 self.assertFalse(client.recv(100))
62 def test_called_for_each_connection(self):
65 def counter(sock, addr):
67 l = eventlet.listen(('localhost', 0))
68 gt = eventlet.spawn(eventlet.serve, l, counter)
69 for i in six.moves.range(100):
70 client = eventlet.connect(('localhost', l.getsockname()[1]))
71 self.assertFalse(client.recv(100))
73 self.assertEqual(100, hits[0])
75 def test_blocking(self):
76 l = eventlet.listen(('localhost', 0))
77 x = eventlet.with_timeout(
79 eventlet.serve, l, lambda c, a: None,
80 timeout_value="timeout")
81 self.assertEqual(x, "timeout")
83 def test_raising_stopserve(self):
84 def stopit(conn, addr):
85 raise eventlet.StopServe()
86 l = eventlet.listen(('localhost', 0))
87 # connect to trigger a call to stopit
88 gt = eventlet.spawn(eventlet.connect, ('localhost', l.getsockname()[1]))
89 eventlet.serve(l, stopit)
92 def test_concurrency(self):
95 def waiter(sock, addr):
98 l = eventlet.listen(('localhost', 0))
99 eventlet.spawn(eventlet.serve, l, waiter, 5)
102 c = eventlet.connect(('localhost', l.getsockname()[1]))
103 # verify the client is connected by getting data
104 self.assertEqual(b'hi', c.recv(2))
106 [test_client() for i in range(5)]
107 # very next client should not get anything
108 x = eventlet.with_timeout(
111 timeout_value="timed out")
112 self.assertEqual(x, "timed out")
115 def test_wrap_ssl(self):
116 server = eventlet.wrap_ssl(
117 eventlet.listen(('localhost', 0)),
118 certfile=certificate_file, keyfile=private_key_file,
120 port = server.getsockname()[1]
122 def handle(sock, addr):
123 sock.sendall(sock.recv(1024))
124 raise eventlet.StopServe()
126 eventlet.spawn(eventlet.serve, server, handle)
127 client = eventlet.wrap_ssl(eventlet.connect(('localhost', port)))
128 client.sendall(b"echo")
129 self.assertEqual(b"echo", client.recv(1024))
131 def test_socket_reuse(self):
132 lsock1 = eventlet.listen(('localhost', 0))
133 port = lsock1.getsockname()[1]
136 return eventlet.listen(('localhost', port))
138 self.assertRaises(socket.error, same_socket)