2 from eventlet import event, hubs, queue
3 from tests import LimitedTestCase, main
7 eventlet.Timeout(0, RuntimeError())
15 class TestQueue(LimitedTestCase):
16 def test_send_first(self):
19 self.assertEqual(q.get(), 'hi')
21 def test_send_last(self):
25 self.assertEqual(q.get(), 'hi2')
27 gt = eventlet.spawn(eventlet.with_timeout, 0.1, waiter, q)
33 def test_max_size(self):
45 gt = eventlet.spawn(putter, q)
47 self.assertEqual(results, ['a', 'b'])
48 self.assertEqual(q.get(), 'a')
50 self.assertEqual(results, ['a', 'b', 'c'])
51 self.assertEqual(q.get(), 'b')
52 self.assertEqual(q.get(), 'c')
55 def test_zero_max_size(self):
67 gt = eventlet.spawn(sender, evt, q)
69 assert not evt.ready()
70 gt2 = eventlet.spawn(receiver, q)
71 self.assertEqual(gt2.wait(), 'hi')
72 self.assertEqual(evt.wait(), 'done')
75 def test_resize_up(self):
83 gt = eventlet.spawn(sender, evt, q)
85 assert not evt.ready()
91 def test_resize_down(self):
97 self.assertEqual(list(q.queue), list(range(5)))
100 self.assertEqual(list(q.queue), list(range(5)))
102 def test_resize_to_Unlimited(self):
103 q = eventlet.Queue(0)
110 gt = eventlet.spawn(sender, evt, q)
112 self.assertFalse(evt.ready())
115 self.assertTrue(evt.ready())
118 def test_multiple_waiters(self):
119 # tests that multiple waiters get their results back
122 sendings = ['1', '2', '3', '4']
123 gts = [eventlet.spawn(q.get) for x in sendings]
125 eventlet.sleep(0.01) # get 'em all waiting
132 for i, gt in enumerate(gts):
133 results.add(gt.wait())
134 self.assertEqual(results, set(sendings))
136 def test_waiters_that_cancel(self):
139 gt = eventlet.spawn(do_bail, q)
140 self.assertEqual(gt.wait(), 'timed out')
143 self.assertEqual(q.get(), 'hi')
145 def test_getting_before_sending(self):
147 gt = eventlet.spawn(q.put, 'sent')
148 self.assertEqual(q.get(), 'sent')
151 def test_two_waiters_one_dies(self):
156 dying = eventlet.spawn(do_bail, q)
157 waiting = eventlet.spawn(waiter, q)
160 self.assertEqual(dying.wait(), 'timed out')
161 self.assertEqual(waiting.wait(), 'hi')
163 def test_two_bogus_waiters(self):
165 gt1 = eventlet.spawn(do_bail, q)
166 gt2 = eventlet.spawn(do_bail, q)
169 self.assertEqual(gt1.wait(), 'timed out')
170 self.assertEqual(gt2.wait(), 'timed out')
171 self.assertEqual(q.get(), 'sent')
173 def test_waiting(self):
175 gt1 = eventlet.spawn(q.get)
177 self.assertEqual(1, q.getting())
180 self.assertEqual(0, q.getting())
181 self.assertEqual('hi', gt1.wait())
182 self.assertEqual(0, q.getting())
184 def test_channel_send(self):
185 channel = eventlet.Queue(0)
188 def another_greenlet():
189 events.append(channel.get())
190 events.append(channel.get())
192 eventlet.spawn(another_greenlet)
194 events.append('sending')
196 events.append('sent hello')
198 events.append('sent world')
200 self.assertEqual(['sending', 'hello', 'sent hello', 'world', 'sent world'], events)
202 def test_channel_wait(self):
203 channel = eventlet.Queue(0)
206 def another_greenlet():
207 events.append('sending hello')
209 events.append('sending world')
211 events.append('sent world')
213 eventlet.spawn(another_greenlet)
215 events.append('waiting')
216 events.append(channel.get())
217 events.append(channel.get())
219 self.assertEqual(['waiting', 'sending hello', 'hello', 'sending world', 'world'], events)
222 ['waiting', 'sending hello', 'hello', 'sending world', 'world', 'sent world'], events)
224 def test_channel_waiters(self):
225 c = eventlet.Queue(0)
226 w1 = eventlet.spawn(c.get)
227 w2 = eventlet.spawn(c.get)
228 w3 = eventlet.spawn(c.get)
230 self.assertEqual(c.getting(), 3)
231 s1 = eventlet.spawn(c.put, 1)
232 s2 = eventlet.spawn(c.put, 2)
233 s3 = eventlet.spawn(c.put, 3)
238 self.assertEqual(c.getting(), 0)
239 # NOTE: we don't guarantee that waiters are served in order
240 results = sorted([w1.wait(), w2.wait(), w3.wait()])
241 self.assertEqual(results, [1, 2, 3])
243 def test_channel_sender_timing_out(self):
244 c = eventlet.Queue(0)
245 self.assertRaises(queue.Full, c.put, "hi", timeout=0.001)
246 self.assertRaises(queue.Empty, c.get_nowait)
248 def test_task_done(self):
249 channel = queue.Queue(0)
251 gt = eventlet.spawn(channel.put, X)
252 result = channel.get()
253 assert result is X, (result, X)
254 assert channel.unfinished_tasks == 1, channel.unfinished_tasks
256 assert channel.unfinished_tasks == 0, channel.unfinished_tasks
259 def test_join_doesnt_block_when_queue_is_already_empty(self):
260 queue = eventlet.Queue()
264 def store_result(result, func, *args):
266 result.append(func(*args))
267 except Exception as exc:
271 class TestNoWait(LimitedTestCase):
272 def test_put_nowait_simple(self):
275 q = eventlet.Queue(1)
276 hub.schedule_call_global(0, store_result, result, q.put_nowait, 2)
277 hub.schedule_call_global(0, store_result, result, q.put_nowait, 3)
280 assert len(result) == 2, result
281 assert result[0] is None, result
282 assert isinstance(result[1], queue.Full), result
284 def test_get_nowait_simple(self):
289 hub.schedule_call_global(0, store_result, result, q.get_nowait)
290 hub.schedule_call_global(0, store_result, result, q.get_nowait)
292 assert len(result) == 2, result
293 assert result[0] == 4, result
294 assert isinstance(result[1], queue.Empty), result
296 # get_nowait must work from the mainloop
297 def test_get_nowait_unlock(self):
301 p = eventlet.spawn(q.put, 5)
307 hub.schedule_call_global(0, store_result, result, q.get_nowait)
311 assert result == [5], result
312 # TODO add ready to greenthread
313 # assert p.ready(), p
317 # put_nowait must work from the mainloop
318 def test_put_nowait_unlock(self):
322 eventlet.spawn(q.get)
328 hub.schedule_call_global(0, store_result, result, q.put_nowait, 10)
329 # TODO ready method on greenthread
330 # assert not p.ready(), p
332 assert result == [None], result
334 # assert p.ready(), p
339 if __name__ == '__main__':