Add python-eventlet package to MOS 9.0 repository
[packages/trusty/python-eventlet.git] / python-eventlet / eventlet / tpool.py
index 8d73814d437b6586c9eee218e793f2e7d724b99e..618c37799136acc23e7a854910f9e03e29fa4755 100644 (file)
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import atexit
 import imp
 import os
 import sys
 import traceback
 
+import eventlet
 from eventlet import event, greenio, greenthread, patcher, timeout
 from eventlet.support import six
 
@@ -39,7 +41,7 @@ if six.PY3:
 Empty = Queue_module.Empty
 Queue = Queue_module.Queue
 
-_bytetosend = ' '.encode()
+_bytetosend = b' '
 _coro = None
 _nthreads = int(os.environ.get('EVENTLET_THREADPOOL_SIZE', 20))
 _reqq = _rspq = None
@@ -54,6 +56,7 @@ def tpool_trampoline():
         try:
             _c = _rsock.recv(1)
             assert _c
+        # FIXME: this is probably redundant since using sockets instead of pipe now
         except ValueError:
             break  # will be raised when pipe is closed
         while not _rspq.empty():
@@ -250,29 +253,33 @@ class Proxy(object):
 
 
 def setup():
-    global _rsock, _wsock, _threads, _coro, _setup_already, _rspq, _reqq
+    global _rsock, _wsock, _coro, _setup_already, _rspq, _reqq
     if _setup_already:
         return
     else:
         _setup_already = True
 
+    assert _nthreads >= 0, "Can't specify negative number of threads"
+    if _nthreads == 0:
+        import warnings
+        warnings.warn("Zero threads in tpool.  All tpool.execute calls will\
+            execute in main thread.  Check the value of the environment \
+            variable EVENTLET_THREADPOOL_SIZE.", RuntimeWarning)
+    _reqq = Queue(maxsize=-1)
+    _rspq = Queue(maxsize=-1)
+
+    # connected socket pair
     sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
     sock.bind(('127.0.0.1', 0))
     sock.listen(1)
     csock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
     csock.connect(sock.getsockname())
+    csock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
     _wsock, _addr = sock.accept()
+    _wsock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
     sock.close()
     _rsock = greenio.GreenSocket(csock)
 
-    _reqq = Queue(maxsize=-1)
-    _rspq = Queue(maxsize=-1)
-    assert _nthreads >= 0, "Can't specify negative number of threads"
-    if _nthreads == 0:
-        import warnings
-        warnings.warn("Zero threads in tpool.  All tpool.execute calls will\
-            execute in main thread.  Check the value of the environment \
-            variable EVENTLET_THREADPOOL_SIZE.", RuntimeWarning)
     for i in six.moves.range(_nthreads):
         t = threading.Thread(target=tworker,
                              name="tpool_thread_%s" % i)
@@ -281,12 +288,20 @@ def setup():
         _threads.append(t)
 
     _coro = greenthread.spawn_n(tpool_trampoline)
+    # This yield fixes subtle error with GreenSocket.__del__
+    eventlet.sleep(0)
 
 
+# Avoid ResourceWarning unclosed socket on Python3.2+
+@atexit.register
 def killall():
     global _setup_already, _rspq, _rsock, _wsock
     if not _setup_already:
         return
+
+    # This yield fixes freeze in some scenarios
+    eventlet.sleep(0)
+
     for thr in _threads:
         _reqq.put(None)
     for thr in _threads:
@@ -294,7 +309,7 @@ def killall():
     del _threads[:]
 
     # return any remaining results
-    while not _rspq.empty():
+    while (_rspq is not None) and not _rspq.empty():
         try:
             (e, rv) = _rspq.get(block=False)
             e.send(rv)
@@ -304,10 +319,12 @@ def killall():
 
     if _coro is not None:
         greenthread.kill(_coro)
-    _rsock.close()
-    _wsock.close()
-    _rsock = None
-    _wsock = None
+    if _rsock is not None:
+        _rsock.close()
+        _rsock = None
+    if _wsock is not None:
+        _wsock.close()
+        _wsock = None
     _rspq = None
     _setup_already = False