--- /dev/null
+import unittest
+from eventlet import spawn, sleep, with_timeout
+from eventlet.event import Event
+import eventlet
+from tests import LimitedTestCase
+
+DELAY = 0.01
+
+
+class TestEvent(LimitedTestCase):
+
+ def test_send_exc(self):
+ log = []
+ e = Event()
+
+ def waiter():
+ try:
+ result = e.wait()
+ log.append(('received', result))
+ except Exception as ex:
+ log.append(('catched', ex))
+ spawn(waiter)
+ sleep(0) # let waiter to block on e.wait()
+ obj = Exception()
+ e.send(exc=obj)
+ sleep(0)
+ sleep(0)
+ assert log == [('catched', obj)], log
+
+ def test_send(self):
+ event1 = Event()
+ event2 = Event()
+
+ spawn(event1.send, 'hello event1')
+ eventlet.Timeout(0, ValueError('interrupted'))
+ try:
+ result = event1.wait()
+ except ValueError:
+ X = object()
+ result = with_timeout(DELAY, event2.wait, timeout_value=X)
+ assert result is X, 'Nobody sent anything to event2 yet it received %r' % (result, )
+
+
+if __name__ == '__main__':
+ unittest.main()