2 from eventlet import event, hubs, queue
3 from tests import LimitedTestCase, main
7 eventlet.Timeout(0, RuntimeError())
15 class TestQueue(LimitedTestCase):
16 def test_send_first(self):
19 self.assertEqual(q.get(), 'hi')
21 def test_send_last(self):
25 self.assertEqual(q.get(), 'hi2')
27 gt = eventlet.spawn(eventlet.with_timeout, 0.1, waiter, q)
33 def test_max_size(self):
45 gt = eventlet.spawn(putter, q)
47 self.assertEqual(results, ['a', 'b'])
48 self.assertEqual(q.get(), 'a')
50 self.assertEqual(results, ['a', 'b', 'c'])
51 self.assertEqual(q.get(), 'b')
52 self.assertEqual(q.get(), 'c')
55 def test_zero_max_size(self):
67 gt = eventlet.spawn(sender, evt, q)
69 assert not evt.ready()
70 gt2 = eventlet.spawn(receiver, q)
71 self.assertEqual(gt2.wait(), 'hi')
72 self.assertEqual(evt.wait(), 'done')
75 def test_resize_up(self):
83 gt = eventlet.spawn(sender, evt, q)
85 assert not evt.ready()
91 def test_resize_down(self):
97 self.assertEqual(list(q.queue), list(range(5)))
100 self.assertEqual(list(q.queue), list(range(5)))
102 def test_resize_to_Unlimited(self):
103 q = eventlet.Queue(0)
110 gt = eventlet.spawn(sender, evt, q)
112 self.assertFalse(evt.ready())
115 self.assertTrue(evt.ready())
118 def test_multiple_waiters(self):
119 # tests that multiple waiters get their results back
122 sendings = ['1', '2', '3', '4']
123 gts = [eventlet.spawn(q.get) for x in sendings]
125 eventlet.sleep(0.01) # get 'em all waiting
132 for i, gt in enumerate(gts):
133 results.add(gt.wait())
134 self.assertEqual(results, set(sendings))
136 def test_waiters_that_cancel(self):
139 gt = eventlet.spawn(do_bail, q)
140 self.assertEqual(gt.wait(), 'timed out')
143 self.assertEqual(q.get(), 'hi')
145 def test_getting_before_sending(self):
147 gt = eventlet.spawn(q.put, 'sent')
148 self.assertEqual(q.get(), 'sent')
151 def test_two_waiters_one_dies(self):
156 dying = eventlet.spawn(do_bail, q)
157 waiting = eventlet.spawn(waiter, q)
160 self.assertEqual(dying.wait(), 'timed out')
161 self.assertEqual(waiting.wait(), 'hi')
163 def test_two_bogus_waiters(self):
165 gt1 = eventlet.spawn(do_bail, q)
166 gt2 = eventlet.spawn(do_bail, q)
169 self.assertEqual(gt1.wait(), 'timed out')
170 self.assertEqual(gt2.wait(), 'timed out')
171 self.assertEqual(q.get(), 'sent')
173 def test_waiting(self):
175 gt1 = eventlet.spawn(q.get)
177 self.assertEqual(1, q.getting())
180 self.assertEqual(0, q.getting())
181 self.assertEqual('hi', gt1.wait())
182 self.assertEqual(0, q.getting())
184 def test_channel_send(self):
185 channel = eventlet.Queue(0)
188 def another_greenlet():
189 events.append(channel.get())
190 events.append(channel.get())
192 eventlet.spawn(another_greenlet)
194 events.append('sending')
196 events.append('sent hello')
198 events.append('sent world')
200 self.assertEqual(['sending', 'hello', 'sent hello', 'world', 'sent world'], events)
202 def test_channel_wait(self):
203 channel = eventlet.Queue(0)
206 def another_greenlet():
207 events.append('sending hello')
209 events.append('sending world')
211 events.append('sent world')
213 eventlet.spawn(another_greenlet)
215 events.append('waiting')
216 events.append(channel.get())
217 events.append(channel.get())
219 self.assertEqual(['waiting', 'sending hello', 'hello', 'sending world', 'world'], events)
221 self.assertEqual(['waiting', 'sending hello', 'hello', 'sending world', 'world', 'sent world'], events)
223 def test_channel_waiters(self):
224 c = eventlet.Queue(0)
225 w1 = eventlet.spawn(c.get)
226 w2 = eventlet.spawn(c.get)
227 w3 = eventlet.spawn(c.get)
229 self.assertEqual(c.getting(), 3)
230 s1 = eventlet.spawn(c.put, 1)
231 s2 = eventlet.spawn(c.put, 2)
232 s3 = eventlet.spawn(c.put, 3)
237 self.assertEqual(c.getting(), 0)
238 # NOTE: we don't guarantee that waiters are served in order
239 results = sorted([w1.wait(), w2.wait(), w3.wait()])
240 self.assertEqual(results, [1, 2, 3])
242 def test_channel_sender_timing_out(self):
243 c = eventlet.Queue(0)
244 self.assertRaises(queue.Full, c.put, "hi", timeout=0.001)
245 self.assertRaises(queue.Empty, c.get_nowait)
247 def test_task_done(self):
248 channel = queue.Queue(0)
250 gt = eventlet.spawn(channel.put, X)
251 result = channel.get()
252 assert result is X, (result, X)
253 assert channel.unfinished_tasks == 1, channel.unfinished_tasks
255 assert channel.unfinished_tasks == 0, channel.unfinished_tasks
258 def test_join_doesnt_block_when_queue_is_already_empty(self):
259 queue = eventlet.Queue()
263 def store_result(result, func, *args):
265 result.append(func(*args))
266 except Exception as exc:
270 class TestNoWait(LimitedTestCase):
271 def test_put_nowait_simple(self):
274 q = eventlet.Queue(1)
275 hub.schedule_call_global(0, store_result, result, q.put_nowait, 2)
276 hub.schedule_call_global(0, store_result, result, q.put_nowait, 3)
279 assert len(result) == 2, result
280 assert result[0] is None, result
281 assert isinstance(result[1], queue.Full), result
283 def test_get_nowait_simple(self):
288 hub.schedule_call_global(0, store_result, result, q.get_nowait)
289 hub.schedule_call_global(0, store_result, result, q.get_nowait)
291 assert len(result) == 2, result
292 assert result[0] == 4, result
293 assert isinstance(result[1], queue.Empty), result
295 # get_nowait must work from the mainloop
296 def test_get_nowait_unlock(self):
300 p = eventlet.spawn(q.put, 5)
306 hub.schedule_call_global(0, store_result, result, q.get_nowait)
310 assert result == [5], result
311 # TODO add ready to greenthread
312 # assert p.ready(), p
316 # put_nowait must work from the mainloop
317 def test_put_nowait_unlock(self):
321 eventlet.spawn(q.get)
327 hub.schedule_call_global(0, store_result, result, q.put_nowait, 10)
328 # TODO ready method on greenthread
329 # assert not p.ready(), p
331 assert result == [None], result
333 # assert p.ready(), p
338 if __name__ == '__main__':