Add python-eventlet 0.16.1
[packages/trusty/python-eventlet.git] / eventlet / tests / manual / greenio_memtest.py
1 import eventlet
2 from eventlet import greenio
3 import os
4
5
6 __test__ = False
7 _proc_status = '/proc/%d/status' % os.getpid()
8
9 _scale = {'kB': 1024.0, 'mB': 1024.0 * 1024.0,
10           'KB': 1024.0, 'MB': 1024.0 * 1024.0}
11
12
13 def _VmB(VmKey):
14     '''Private.
15     '''
16     global _proc_status, _scale
17     # get pseudo file  /proc/<pid>/status
18     try:
19         t = open(_proc_status)
20         v = t.read()
21         t.close()
22     except:
23         return 0.0  # non-Linux?
24     # get VmKey line e.g. 'VmRSS:  9999  kB\n ...'
25     i = v.index(VmKey)
26     v = v[i:].split(None, 3)  # whitespace
27     if len(v) < 3:
28         return 0.0  # invalid format?
29     # convert Vm value to bytes
30     return float(v[1]) * _scale[v[2]]
31
32
33 def memory(since=0.0):
34     '''Return memory usage in bytes.
35     '''
36     return _VmB('VmSize:') - since
37
38
39 def resident(since=0.0):
40     '''Return resident memory usage in bytes.
41     '''
42     return _VmB('VmRSS:') - since
43
44
45 def stacksize(since=0.0):
46     '''Return stack size in bytes.
47     '''
48     return _VmB('VmStk:') - since
49
50
51 def test_pipe_writes_large_messages():
52     r, w = os.pipe()
53
54     r = greenio.GreenPipe(r)
55     w = greenio.GreenPipe(w, 'w')
56
57     large_message = b"".join([1024 * chr(i) for i in range(65)])
58
59     def writer():
60         w.write(large_message)
61         w.close()
62
63     gt = eventlet.spawn(writer)
64
65     for i in range(65):
66         buf = r.read(1024)
67         expected = 1024 * chr(i)
68         if buf != expected:
69             print(
70                 "expected=%r..%r, found=%r..%r iter=%d"
71                 % (expected[:4], expected[-4:], buf[:4], buf[-4:], i))
72     gt.wait()
73
74
75 if __name__ == "__main__":
76     _iter = 1
77     while True:
78         test_pipe_writes_large_messages()
79
80         _iter += 1
81         if _iter % 10 == 0:
82             print("_iter = %d, VmSize: %d, VmRSS = %d, VmStk = %d" %
83                   (_iter, memory(), resident(), stacksize()))