1 from tests import requires_twisted
5 from twisted.internet import reactor
6 from twisted.internet.error import ConnectionDone
7 import eventlet.twistedutil.protocol as pr
8 from eventlet.twistedutil.protocols.basic import LineOnlyReceiverTransport
10 # stub out some of the twisted dependencies so it at least imports
14 pr.UnbufferedTransport = None
15 pr.GreenTransport = None
16 pr.GreenClientCreator = lambda *a, **k: None
18 class reactor(object):
21 from eventlet import spawn, sleep, with_timeout, spawn_after
22 from eventlet.coros import Event
25 from eventlet.green import socket
31 if socket is not None:
32 def setup_server_socket(self, delay=DELAY, port=0):
34 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
35 s.bind(('127.0.0.1', port))
36 port = s.getsockname()[1]
38 s.settimeout(delay * 3)
41 conn, addr = s.accept()
42 conn.settimeout(delay + 1)
44 hello = conn.makefile().readline()[:-2]
45 except socket.timeout:
47 conn.sendall('you said %s. ' % hello)
56 def setup_server_SpawnFactory(self, delay=DELAY, port=0):
60 hello = conn.readline()
61 except ConnectionDone:
63 conn.write('you said %s. ' % hello)
68 port = reactor.listenTCP(0, pr.SpawnFactory(handle, LineOnlyReceiverTransport))
69 return port.getHost().port
72 class TestCase(unittest.TestCase):
73 transportBufferSize = None
77 return pr.GreenClientCreator(reactor, self.gtransportClass, self.transportBufferSize)
81 port = self.setup_server()
82 self.conn = self.connector.connectTCP('127.0.0.1', port)
83 if self.transportBufferSize is not None:
84 self.assertEqual(self.transportBufferSize, self.conn.transport.bufferSize)
87 class TestUnbufferedTransport(TestCase):
88 gtransportClass = pr.UnbufferedTransport
89 setup_server = setup_server_SpawnFactory
92 def test_full_read(self):
93 self.conn.write('hello\r\n')
94 self.assertEqual(self.conn.read(), 'you said hello. BYE')
95 self.assertEqual(self.conn.read(), '')
96 self.assertEqual(self.conn.read(), '')
99 def test_iterator(self):
100 self.conn.write('iterator\r\n')
101 self.assertEqual('you said iterator. BYE', ''.join(self.conn))
104 class TestUnbufferedTransport_bufsize1(TestUnbufferedTransport):
105 transportBufferSize = 1
106 setup_server = setup_server_SpawnFactory
109 class TestGreenTransport(TestUnbufferedTransport):
110 gtransportClass = pr.GreenTransport
111 setup_server = setup_server_SpawnFactory
115 self.conn.write('hello\r\n')
116 self.assertEqual(self.conn.read(9), 'you said ')
117 self.assertEqual(self.conn.read(999), 'hello. BYE')
118 self.assertEqual(self.conn.read(9), '')
119 self.assertEqual(self.conn.read(1), '')
120 self.assertEqual(self.conn.recv(9), '')
121 self.assertEqual(self.conn.recv(1), '')
124 def test_read2(self):
125 self.conn.write('world\r\n')
126 self.assertEqual(self.conn.read(), 'you said world. BYE')
127 self.assertEqual(self.conn.read(), '')
128 self.assertEqual(self.conn.recv(), '')
131 def test_iterator(self):
132 self.conn.write('iterator\r\n')
133 self.assertEqual('you said iterator. BYE', ''.join(self.conn))
135 _tests = [x for x in locals().keys() if x.startswith('test_')]
138 def test_resume_producing(self):
139 for test in self._tests:
141 self.conn.resumeProducing()
142 getattr(self, test)()
145 def test_pause_producing(self):
146 self.conn.pauseProducing()
147 self.conn.write('hi\r\n')
148 result = with_timeout(DELAY * 10, self.conn.read, timeout_value='timed out')
149 self.assertEqual('timed out', result)
152 def test_pauseresume_producing(self):
153 self.conn.pauseProducing()
154 spawn_after(DELAY * 5, self.conn.resumeProducing)
155 self.conn.write('hi\r\n')
156 result = with_timeout(DELAY * 10, self.conn.read, timeout_value='timed out')
157 self.assertEqual('you said hi. BYE', result)
160 class TestGreenTransport_bufsize1(TestGreenTransport):
161 transportBufferSize = 1
163 # class TestGreenTransportError(TestCase):
164 # setup_server = setup_server_SpawnFactory
165 # gtransportClass = pr.GreenTransport
167 # def test_read_error(self):
168 # self.conn.write('hello\r\n')
169 # sleep(DELAY*1.5) # make sure the rest of data arrives
173 # self.conn.loseConnection(failure.Failure()) # does not work, why?
174 # spawn(self.conn._queue.send_exception, *sys.exc_info())
175 # self.assertEqual(self.conn.read(9), 'you said ')
176 # self.assertEqual(self.conn.read(7), 'hello. ')
177 # self.assertEqual(self.conn.read(9), 'BYE')
178 # self.assertRaises(ZeroDivisionError, self.conn.read, 9)
179 # self.assertEqual(self.conn.read(1), '')
180 # self.assertEqual(self.conn.read(1), '')
182 # def test_recv_error(self):
183 # self.conn.write('hello')
184 # self.assertEqual('you said hello. ', self.conn.recv())
185 # sleep(DELAY*1.5) # make sure the rest of data arrives
189 # self.conn.loseConnection(failure.Failure()) # does not work, why?
190 # spawn(self.conn._queue.send_exception, *sys.exc_info())
191 # self.assertEqual('BYE', self.conn.recv())
192 # self.assertRaises(ZeroDivisionError, self.conn.recv, 9)
193 # self.assertEqual('', self.conn.recv(1))
194 # self.assertEqual('', self.conn.recv())
197 if socket is not None:
199 class TestUnbufferedTransport_socketserver(TestUnbufferedTransport):
200 setup_server = setup_server_socket
202 class TestUnbufferedTransport_socketserver_bufsize1(TestUnbufferedTransport):
203 transportBufferSize = 1
204 setup_server = setup_server_socket
206 class TestGreenTransport_socketserver(TestGreenTransport):
207 setup_server = setup_server_socket
209 class TestGreenTransport_socketserver_bufsize1(TestGreenTransport):
210 transportBufferSize = 1
211 setup_server = setup_server_socket
214 class TestTLSError(unittest.TestCase):
216 def test_server_connectionMade_never_called(self):
217 # trigger case when protocol instance is created,
218 # but it's connectionMade is never called
219 from gnutls.interfaces.twisted import X509Credentials
220 from gnutls.errors import GNUTLSError
221 cred = X509Credentials(None, None)
225 ev.send("handle must not be called")
226 s = reactor.listenTLS(0, pr.SpawnFactory(handle, LineOnlyReceiverTransport), cred)
227 creator = pr.GreenClientCreator(reactor, LineOnlyReceiverTransport)
229 conn = creator.connectTLS('127.0.0.1', s.getHost().port, cred)
232 assert ev.poll() is None, repr(ev.poll())
235 import gnutls.interfaces.twisted
244 if __name__ == '__main__':