3 from tests import LimitedTestCase, main, skip_on_windows
5 warnings.simplefilter('ignore', DeprecationWarning)
6 from eventlet import processes, api
7 warnings.simplefilter('default', DeprecationWarning)
10 class TestEchoPool(LimitedTestCase):
12 super(TestEchoPool, self).setUp()
13 self.pool = processes.ProcessPool('echo', ["hello"])
19 proc = self.pool.get()
24 self.assertEqual(result, 'hello\n')
27 def test_read_eof(self):
28 proc = self.pool.get()
31 self.assertRaises(processes.DeadProcess, proc.read)
36 def test_empty_echo(self):
37 p = processes.Process('echo', ['-n'])
38 self.assertEqual('', p.read())
39 self.assertRaises(processes.DeadProcess, p.read)
42 class TestCatPool(LimitedTestCase):
44 super(TestCatPool, self).setUp()
46 self.pool = processes.ProcessPool('cat')
52 proc = self.pool.get()
60 self.assertEqual(result, 'goodbye')
63 def test_write_to_dead(self):
66 proc = self.pool.get()
71 self.assertRaises(processes.DeadProcess, proc.write, 'foo')
79 proc = self.pool.get()
83 self.assertRaises(processes.DeadProcess, proc.write, 'goodbye')
88 class TestDyingProcessesLeavePool(LimitedTestCase):
90 super(TestDyingProcessesLeavePool, self).setUp()
91 self.pool = processes.ProcessPool('echo', ['hello'], max_size=1)
94 def test_dead_process_not_inserted_into_pool(self):
95 proc = self.pool.get()
99 self.assertEqual(result, 'hello\n')
101 except processes.DeadProcess:
105 proc2 = self.pool.get()
106 assert proc is not proc2
109 if __name__ == '__main__':