Add python-eventlet package to MOS 9.0 repository
[packages/trusty/python-eventlet.git] / python-eventlet / tests / subprocess_test.py
1 import sys
2 import time
3
4 import eventlet
5 from eventlet.green import subprocess
6 import eventlet.patcher
7 import tests
8 original_subprocess = eventlet.patcher.original('subprocess')
9
10
11 def test_subprocess_wait():
12     # https://bitbucket.org/eventlet/eventlet/issue/89
13     # In Python 3.3 subprocess.Popen.wait() method acquired `timeout`
14     # argument.
15     # RHEL backported it to their Python 2.6 package.
16     cmd = [sys.executable, "-c", "import time; time.sleep(0.5)"]
17     p = subprocess.Popen(cmd)
18     ok = False
19     t1 = time.time()
20     try:
21         p.wait(timeout=0.1)
22     except subprocess.TimeoutExpired as e:
23         str(e)  # make sure it doesn't throw
24         assert e.cmd == cmd
25         assert e.timeout == 0.1
26         ok = True
27     tdiff = time.time() - t1
28     assert ok, 'did not raise subprocess.TimeoutExpired'
29     assert 0.1 <= tdiff <= 0.2, 'did not stop within allowed time'
30
31
32 def test_communicate_with_poll():
33     # This test was being skipped since git 25812fca8, I don't there's
34     # a need to do this. The original comment:
35     #
36     # https://github.com/eventlet/eventlet/pull/24
37     # `eventlet.green.subprocess.Popen.communicate()` was broken
38     # in Python 2.7 because the usage of the `select` module was moved from
39     # `_communicate` into two other methods `_communicate_with_select`
40     # and `_communicate_with_poll`. Link to 2.7's implementation:
41     # http://hg.python.org/cpython/file/2145593d108d/Lib/subprocess.py#l1255
42
43     p = subprocess.Popen(
44         [sys.executable, '-c', 'import time; time.sleep(0.5)'],
45         stdout=subprocess.PIPE, stderr=subprocess.PIPE)
46     t1 = time.time()
47     eventlet.with_timeout(0.1, p.communicate, timeout_value=True)
48     tdiff = time.time() - t1
49     assert 0.1 <= tdiff <= 0.2, 'did not stop within allowed time'
50
51
52 def test_close_popen_stdin_with_close_fds():
53     p = subprocess.Popen(
54         ['ls'],
55         stdin=subprocess.PIPE,
56         stdout=subprocess.PIPE,
57         stderr=subprocess.PIPE,
58         close_fds=True,
59         shell=False,
60         cwd=None,
61         env=None)
62
63     p.communicate(None)
64
65     try:
66         p.stdin.close()
67     except Exception as e:
68         assert False, "Exception should not be raised, got %r instead" % e
69
70
71 def test_universal_lines():
72     p = subprocess.Popen(
73         [sys.executable, '--version'],
74         shell=False,
75         stdout=subprocess.PIPE,
76         universal_newlines=True)
77     p.communicate(None)
78
79
80 def test_patched_communicate_290():
81     # https://github.com/eventlet/eventlet/issues/290
82     # Certain order of import and monkey_patch breaks subprocess communicate()
83     # with AttributeError module `select` has no `poll` on Linux
84     # unpatched methods are removed for safety reasons in commit f63165c0e3
85     tests.run_isolated('subprocess_patched_communicate.py')