# See the License for the specific language governing permissions and
# limitations under the License.
+import atexit
import imp
import os
import sys
import traceback
+import eventlet
from eventlet import event, greenio, greenthread, patcher, timeout
from eventlet.support import six
Empty = Queue_module.Empty
Queue = Queue_module.Queue
-_bytetosend = ' '.encode()
+_bytetosend = b' '
_coro = None
_nthreads = int(os.environ.get('EVENTLET_THREADPOOL_SIZE', 20))
_reqq = _rspq = None
try:
_c = _rsock.recv(1)
assert _c
+ # FIXME: this is probably redundant since using sockets instead of pipe now
except ValueError:
break # will be raised when pipe is closed
while not _rspq.empty():
def setup():
- global _rsock, _wsock, _threads, _coro, _setup_already, _rspq, _reqq
+ global _rsock, _wsock, _coro, _setup_already, _rspq, _reqq
if _setup_already:
return
else:
_setup_already = True
+ assert _nthreads >= 0, "Can't specify negative number of threads"
+ if _nthreads == 0:
+ import warnings
+ warnings.warn("Zero threads in tpool. All tpool.execute calls will\
+ execute in main thread. Check the value of the environment \
+ variable EVENTLET_THREADPOOL_SIZE.", RuntimeWarning)
+ _reqq = Queue(maxsize=-1)
+ _rspq = Queue(maxsize=-1)
+
+ # connected socket pair
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('127.0.0.1', 0))
sock.listen(1)
csock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
csock.connect(sock.getsockname())
+ csock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
_wsock, _addr = sock.accept()
+ _wsock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
sock.close()
_rsock = greenio.GreenSocket(csock)
- _reqq = Queue(maxsize=-1)
- _rspq = Queue(maxsize=-1)
- assert _nthreads >= 0, "Can't specify negative number of threads"
- if _nthreads == 0:
- import warnings
- warnings.warn("Zero threads in tpool. All tpool.execute calls will\
- execute in main thread. Check the value of the environment \
- variable EVENTLET_THREADPOOL_SIZE.", RuntimeWarning)
for i in six.moves.range(_nthreads):
t = threading.Thread(target=tworker,
name="tpool_thread_%s" % i)
_threads.append(t)
_coro = greenthread.spawn_n(tpool_trampoline)
+ # This yield fixes subtle error with GreenSocket.__del__
+ eventlet.sleep(0)
+# Avoid ResourceWarning unclosed socket on Python3.2+
+@atexit.register
def killall():
global _setup_already, _rspq, _rsock, _wsock
if not _setup_already:
return
+
+ # This yield fixes freeze in some scenarios
+ eventlet.sleep(0)
+
for thr in _threads:
_reqq.put(None)
for thr in _threads:
del _threads[:]
# return any remaining results
- while not _rspq.empty():
+ while (_rspq is not None) and not _rspq.empty():
try:
(e, rv) = _rspq.get(block=False)
e.send(rv)
if _coro is not None:
greenthread.kill(_coro)
- _rsock.close()
- _wsock.close()
- _rsock = None
- _wsock = None
+ if _rsock is not None:
+ _rsock.close()
+ _rsock = None
+ if _wsock is not None:
+ _wsock.close()
+ _wsock = None
_rspq = None
_setup_already = False