Add python-eventlet 0.16.1
[packages/trusty/python-eventlet.git] / eventlet / tests / event_test.py
1 import eventlet
2 from eventlet import event
3 from tests import LimitedTestCase
4
5
6 class TestEvent(LimitedTestCase):
7     def test_waiting_for_event(self):
8         evt = event.Event()
9         value = 'some stuff'
10
11         def send_to_event():
12             evt.send(value)
13         eventlet.spawn_n(send_to_event)
14         self.assertEqual(evt.wait(), value)
15
16     def test_multiple_waiters(self):
17         self._test_multiple_waiters(False)
18
19     def test_multiple_waiters_with_exception(self):
20         self._test_multiple_waiters(True)
21
22     def _test_multiple_waiters(self, exception):
23         evt = event.Event()
24         results = []
25
26         def wait_on_event(i_am_done):
27             evt.wait()
28             results.append(True)
29             i_am_done.send()
30             if exception:
31                 raise Exception()
32
33         waiters = []
34         count = 5
35         for i in range(count):
36             waiters.append(event.Event())
37             eventlet.spawn_n(wait_on_event, waiters[-1])
38         eventlet.sleep()  # allow spawns to start executing
39         evt.send()
40
41         for w in waiters:
42             w.wait()
43
44         self.assertEqual(len(results), count)
45
46     def test_reset(self):
47         evt = event.Event()
48
49         # calling reset before send should throw
50         self.assertRaises(AssertionError, evt.reset)
51
52         value = 'some stuff'
53
54         def send_to_event():
55             evt.send(value)
56         eventlet.spawn_n(send_to_event)
57         self.assertEqual(evt.wait(), value)
58
59         # now try it again, and we should get the same exact value,
60         # and we shouldn't be allowed to resend without resetting
61         value2 = 'second stuff'
62         self.assertRaises(AssertionError, evt.send, value2)
63         self.assertEqual(evt.wait(), value)
64
65         # reset and everything should be happy
66         evt.reset()
67
68         def send_to_event2():
69             evt.send(value2)
70         eventlet.spawn_n(send_to_event2)
71         self.assertEqual(evt.wait(), value2)
72
73     def test_double_exception(self):
74         evt = event.Event()
75         # send an exception through the event
76         evt.send(exc=RuntimeError('from test_double_exception'))
77         self.assertRaises(RuntimeError, evt.wait)
78         evt.reset()
79         # shouldn't see the RuntimeError again
80         eventlet.Timeout(0.001)
81         self.assertRaises(eventlet.Timeout, evt.wait)