X-Git-Url: https://review.fuel-infra.org/gitweb?a=blobdiff_plain;f=python-eventlet%2Feventlet%2Ftpool.py;h=618c37799136acc23e7a854910f9e03e29fa4755;hb=refs%2Fheads%2Fmaster;hp=8d73814d437b6586c9eee218e793f2e7d724b99e;hpb=358bd9258c2b6d2ee74de4dfd07a5123107abad4;p=packages%2Ftrusty%2Fpython-eventlet.git diff --git a/python-eventlet/eventlet/tpool.py b/python-eventlet/eventlet/tpool.py index 8d73814..618c377 100644 --- a/python-eventlet/eventlet/tpool.py +++ b/python-eventlet/eventlet/tpool.py @@ -13,11 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. +import atexit import imp import os import sys import traceback +import eventlet from eventlet import event, greenio, greenthread, patcher, timeout from eventlet.support import six @@ -39,7 +41,7 @@ if six.PY3: Empty = Queue_module.Empty Queue = Queue_module.Queue -_bytetosend = ' '.encode() +_bytetosend = b' ' _coro = None _nthreads = int(os.environ.get('EVENTLET_THREADPOOL_SIZE', 20)) _reqq = _rspq = None @@ -54,6 +56,7 @@ def tpool_trampoline(): try: _c = _rsock.recv(1) assert _c + # FIXME: this is probably redundant since using sockets instead of pipe now except ValueError: break # will be raised when pipe is closed while not _rspq.empty(): @@ -250,29 +253,33 @@ class Proxy(object): def setup(): - global _rsock, _wsock, _threads, _coro, _setup_already, _rspq, _reqq + global _rsock, _wsock, _coro, _setup_already, _rspq, _reqq if _setup_already: return else: _setup_already = True + assert _nthreads >= 0, "Can't specify negative number of threads" + if _nthreads == 0: + import warnings + warnings.warn("Zero threads in tpool. All tpool.execute calls will\ + execute in main thread. Check the value of the environment \ + variable EVENTLET_THREADPOOL_SIZE.", RuntimeWarning) + _reqq = Queue(maxsize=-1) + _rspq = Queue(maxsize=-1) + + # connected socket pair sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.bind(('127.0.0.1', 0)) sock.listen(1) csock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) csock.connect(sock.getsockname()) + csock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True) _wsock, _addr = sock.accept() + _wsock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True) sock.close() _rsock = greenio.GreenSocket(csock) - _reqq = Queue(maxsize=-1) - _rspq = Queue(maxsize=-1) - assert _nthreads >= 0, "Can't specify negative number of threads" - if _nthreads == 0: - import warnings - warnings.warn("Zero threads in tpool. All tpool.execute calls will\ - execute in main thread. Check the value of the environment \ - variable EVENTLET_THREADPOOL_SIZE.", RuntimeWarning) for i in six.moves.range(_nthreads): t = threading.Thread(target=tworker, name="tpool_thread_%s" % i) @@ -281,12 +288,20 @@ def setup(): _threads.append(t) _coro = greenthread.spawn_n(tpool_trampoline) + # This yield fixes subtle error with GreenSocket.__del__ + eventlet.sleep(0) +# Avoid ResourceWarning unclosed socket on Python3.2+ +@atexit.register def killall(): global _setup_already, _rspq, _rsock, _wsock if not _setup_already: return + + # This yield fixes freeze in some scenarios + eventlet.sleep(0) + for thr in _threads: _reqq.put(None) for thr in _threads: @@ -294,7 +309,7 @@ def killall(): del _threads[:] # return any remaining results - while not _rspq.empty(): + while (_rspq is not None) and not _rspq.empty(): try: (e, rv) = _rspq.get(block=False) e.send(rv) @@ -304,10 +319,12 @@ def killall(): if _coro is not None: greenthread.kill(_coro) - _rsock.close() - _wsock.close() - _rsock = None - _wsock = None + if _rsock is not None: + _rsock.close() + _rsock = None + if _wsock is not None: + _wsock.close() + _wsock = None _rspq = None _setup_already = False