Add python-eventlet 0.16.1
[packages/trusty/python-eventlet.git] / eventlet / tests / manual / greenio_memtest.py
diff --git a/eventlet/tests/manual/greenio_memtest.py b/eventlet/tests/manual/greenio_memtest.py
new file mode 100644 (file)
index 0000000..9117b10
--- /dev/null
@@ -0,0 +1,83 @@
+import eventlet
+from eventlet import greenio
+import os
+
+
+__test__ = False
+_proc_status = '/proc/%d/status' % os.getpid()
+
+_scale = {'kB': 1024.0, 'mB': 1024.0 * 1024.0,
+          'KB': 1024.0, 'MB': 1024.0 * 1024.0}
+
+
+def _VmB(VmKey):
+    '''Private.
+    '''
+    global _proc_status, _scale
+    # get pseudo file  /proc/<pid>/status
+    try:
+        t = open(_proc_status)
+        v = t.read()
+        t.close()
+    except:
+        return 0.0  # non-Linux?
+    # get VmKey line e.g. 'VmRSS:  9999  kB\n ...'
+    i = v.index(VmKey)
+    v = v[i:].split(None, 3)  # whitespace
+    if len(v) < 3:
+        return 0.0  # invalid format?
+    # convert Vm value to bytes
+    return float(v[1]) * _scale[v[2]]
+
+
+def memory(since=0.0):
+    '''Return memory usage in bytes.
+    '''
+    return _VmB('VmSize:') - since
+
+
+def resident(since=0.0):
+    '''Return resident memory usage in bytes.
+    '''
+    return _VmB('VmRSS:') - since
+
+
+def stacksize(since=0.0):
+    '''Return stack size in bytes.
+    '''
+    return _VmB('VmStk:') - since
+
+
+def test_pipe_writes_large_messages():
+    r, w = os.pipe()
+
+    r = greenio.GreenPipe(r)
+    w = greenio.GreenPipe(w, 'w')
+
+    large_message = b"".join([1024 * chr(i) for i in range(65)])
+
+    def writer():
+        w.write(large_message)
+        w.close()
+
+    gt = eventlet.spawn(writer)
+
+    for i in range(65):
+        buf = r.read(1024)
+        expected = 1024 * chr(i)
+        if buf != expected:
+            print(
+                "expected=%r..%r, found=%r..%r iter=%d"
+                % (expected[:4], expected[-4:], buf[:4], buf[-4:], i))
+    gt.wait()
+
+
+if __name__ == "__main__":
+    _iter = 1
+    while True:
+        test_pipe_writes_large_messages()
+
+        _iter += 1
+        if _iter % 10 == 0:
+            print("_iter = %d, VmSize: %d, VmRSS = %d, VmStk = %d" %
+                  (_iter, memory(), resident(), stacksize()))