Add python-eventlet package to MOS 9.0 repository
[packages/trusty/python-eventlet.git] / tests / processes_test.py
1 import sys
2 import warnings
3 from tests import LimitedTestCase, main, skip_on_windows
4
5 warnings.simplefilter('ignore', DeprecationWarning)
6 from eventlet import processes, api
7 warnings.simplefilter('default', DeprecationWarning)
8
9
10 class TestEchoPool(LimitedTestCase):
11     def setUp(self):
12         super(TestEchoPool, self).setUp()
13         self.pool = processes.ProcessPool('echo', ["hello"])
14
15     @skip_on_windows
16     def test_echo(self):
17         result = None
18
19         proc = self.pool.get()
20         try:
21             result = proc.read()
22         finally:
23             self.pool.put(proc)
24         self.assertEqual(result, 'hello\n')
25
26     @skip_on_windows
27     def test_read_eof(self):
28         proc = self.pool.get()
29         try:
30             proc.read()
31             self.assertRaises(processes.DeadProcess, proc.read)
32         finally:
33             self.pool.put(proc)
34
35     @skip_on_windows
36     def test_empty_echo(self):
37         p = processes.Process('echo', ['-n'])
38         self.assertEqual('', p.read())
39         self.assertRaises(processes.DeadProcess, p.read)
40
41
42 class TestCatPool(LimitedTestCase):
43     def setUp(self):
44         super(TestCatPool, self).setUp()
45         api.sleep(0)
46         self.pool = processes.ProcessPool('cat')
47
48     @skip_on_windows
49     def test_cat(self):
50         result = None
51
52         proc = self.pool.get()
53         try:
54             proc.write('goodbye')
55             proc.close_stdin()
56             result = proc.read()
57         finally:
58             self.pool.put(proc)
59
60         self.assertEqual(result, 'goodbye')
61
62     @skip_on_windows
63     def test_write_to_dead(self):
64         result = None
65
66         proc = self.pool.get()
67         try:
68             proc.write('goodbye')
69             proc.close_stdin()
70             result = proc.read()
71             self.assertRaises(processes.DeadProcess, proc.write, 'foo')
72         finally:
73             self.pool.put(proc)
74
75     @skip_on_windows
76     def test_close(self):
77         result = None
78
79         proc = self.pool.get()
80         try:
81             proc.write('hello')
82             proc.close()
83             self.assertRaises(processes.DeadProcess, proc.write, 'goodbye')
84         finally:
85             self.pool.put(proc)
86
87
88 class TestDyingProcessesLeavePool(LimitedTestCase):
89     def setUp(self):
90         super(TestDyingProcessesLeavePool, self).setUp()
91         self.pool = processes.ProcessPool('echo', ['hello'], max_size=1)
92
93     @skip_on_windows
94     def test_dead_process_not_inserted_into_pool(self):
95         proc = self.pool.get()
96         try:
97             try:
98                 result = proc.read()
99                 self.assertEqual(result, 'hello\n')
100                 result = proc.read()
101             except processes.DeadProcess:
102                 pass
103         finally:
104             self.pool.put(proc)
105         proc2 = self.pool.get()
106         assert proc is not proc2
107
108
109 if __name__ == '__main__':
110     main()