Add python-eventlet 0.16.1
[packages/trusty/python-eventlet.git] / eventlet / tests / fork_test.py
1 from tests.patcher_test import ProcessBase
2
3
4 class ForkTest(ProcessBase):
5     def test_simple(self):
6         newmod = '''
7 import eventlet
8 import os
9 import sys
10 import signal
11 from eventlet.support import bytes_to_str, six
12 mydir = %r
13 signal_file = os.path.join(mydir, "output.txt")
14 pid = os.fork()
15 if (pid != 0):
16   eventlet.Timeout(10)
17   try:
18     port = None
19     while True:
20       try:
21         contents = open(signal_file, "rb").read()
22         port = int(contents.split()[0])
23         break
24       except (IOError, IndexError, ValueError, TypeError):
25         eventlet.sleep(0.1)
26     eventlet.connect(('127.0.0.1', port))
27     while True:
28       try:
29         contents = open(signal_file, "rb").read()
30         result = contents.split()[1]
31         break
32       except (IOError, IndexError):
33         eventlet.sleep(0.1)
34     print('result {0}'.format(bytes_to_str(result)))
35   finally:
36     os.kill(pid, signal.SIGTERM)
37 else:
38   try:
39     s = eventlet.listen(('', 0))
40     fd = open(signal_file, "wb")
41     fd.write(six.b(str(s.getsockname()[1])))
42     fd.write(b"\\n")
43     fd.flush()
44     s.accept()
45     fd.write(b"done")
46     fd.flush()
47   finally:
48     fd.close()
49 '''
50         self.write_to_tempfile("newmod", newmod % self.tempdir)
51         output, lines = self.launch_subprocess('newmod.py')
52         self.assertEqual(lines[0], "result done", output)