+++ /dev/null
-# Copyright (c) 2007-2009, Linden Research, Inc.
-# Copyright (c) 2007, IBM Corp.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import imp
-import os
-import sys
-import traceback
-
-from eventlet import event, greenio, greenthread, patcher, timeout
-from eventlet.support import six
-
-__all__ = ['execute', 'Proxy', 'killall', 'set_num_threads']
-
-
-EXC_CLASSES = (Exception, timeout.Timeout)
-SYS_EXCS = (GeneratorExit, KeyboardInterrupt, SystemExit)
-
-QUIET = True
-
-socket = patcher.original('socket')
-threading = patcher.original('threading')
-if six.PY2:
- Queue_module = patcher.original('Queue')
-if six.PY3:
- Queue_module = patcher.original('queue')
-
-Empty = Queue_module.Empty
-Queue = Queue_module.Queue
-
-_bytetosend = ' '.encode()
-_coro = None
-_nthreads = int(os.environ.get('EVENTLET_THREADPOOL_SIZE', 20))
-_reqq = _rspq = None
-_rsock = _wsock = None
-_setup_already = False
-_threads = []
-
-
-def tpool_trampoline():
- global _rspq
- while True:
- try:
- _c = _rsock.recv(1)
- assert _c
- except ValueError:
- break # will be raised when pipe is closed
- while not _rspq.empty():
- try:
- (e, rv) = _rspq.get(block=False)
- e.send(rv)
- e = rv = None
- except Empty:
- pass
-
-
-def tworker():
- global _rspq
- while True:
- try:
- msg = _reqq.get()
- except AttributeError:
- return # can't get anything off of a dud queue
- if msg is None:
- return
- (e, meth, args, kwargs) = msg
- rv = None
- try:
- rv = meth(*args, **kwargs)
- except SYS_EXCS:
- raise
- except EXC_CLASSES:
- rv = sys.exc_info()
- # test_leakage_from_tracebacks verifies that the use of
- # exc_info does not lead to memory leaks
- _rspq.put((e, rv))
- msg = meth = args = kwargs = e = rv = None
- _wsock.sendall(_bytetosend)
-
-
-def execute(meth, *args, **kwargs):
- """
- Execute *meth* in a Python thread, blocking the current coroutine/
- greenthread until the method completes.
-
- The primary use case for this is to wrap an object or module that is not
- amenable to monkeypatching or any of the other tricks that Eventlet uses
- to achieve cooperative yielding. With tpool, you can force such objects to
- cooperate with green threads by sticking them in native threads, at the cost
- of some overhead.
- """
- setup()
- # if already in tpool, don't recurse into the tpool
- # also, call functions directly if we're inside an import lock, because
- # if meth does any importing (sadly common), it will hang
- my_thread = threading.currentThread()
- if my_thread in _threads or imp.lock_held() or _nthreads == 0:
- return meth(*args, **kwargs)
-
- e = event.Event()
- _reqq.put((e, meth, args, kwargs))
-
- rv = e.wait()
- if isinstance(rv, tuple) \
- and len(rv) == 3 \
- and isinstance(rv[1], EXC_CLASSES):
- (c, e, tb) = rv
- if not QUIET:
- traceback.print_exception(c, e, tb)
- traceback.print_stack()
- six.reraise(c, e, tb)
- return rv
-
-
-def proxy_call(autowrap, f, *args, **kwargs):
- """
- Call a function *f* and returns the value. If the type of the return value
- is in the *autowrap* collection, then it is wrapped in a :class:`Proxy`
- object before return.
-
- Normally *f* will be called in the threadpool with :func:`execute`; if the
- keyword argument "nonblocking" is set to ``True``, it will simply be
- executed directly. This is useful if you have an object which has methods
- that don't need to be called in a separate thread, but which return objects
- that should be Proxy wrapped.
- """
- if kwargs.pop('nonblocking', False):
- rv = f(*args, **kwargs)
- else:
- rv = execute(f, *args, **kwargs)
- if isinstance(rv, autowrap):
- return Proxy(rv, autowrap)
- else:
- return rv
-
-
-class Proxy(object):
- """
- a simple proxy-wrapper of any object that comes with a
- methods-only interface, in order to forward every method
- invocation onto a thread in the native-thread pool. A key
- restriction is that the object's methods should not switch
- greenlets or use Eventlet primitives, since they are in a
- different thread from the main hub, and therefore might behave
- unexpectedly. This is for running native-threaded code
- only.
-
- It's common to want to have some of the attributes or return
- values also wrapped in Proxy objects (for example, database
- connection objects produce cursor objects which also should be
- wrapped in Proxy objects to remain nonblocking). *autowrap*, if
- supplied, is a collection of types; if an attribute or return
- value matches one of those types (via isinstance), it will be
- wrapped in a Proxy. *autowrap_names* is a collection
- of strings, which represent the names of attributes that should be
- wrapped in Proxy objects when accessed.
- """
-
- def __init__(self, obj, autowrap=(), autowrap_names=()):
- self._obj = obj
- self._autowrap = autowrap
- self._autowrap_names = autowrap_names
-
- def __getattr__(self, attr_name):
- f = getattr(self._obj, attr_name)
- if not hasattr(f, '__call__'):
- if isinstance(f, self._autowrap) or attr_name in self._autowrap_names:
- return Proxy(f, self._autowrap)
- return f
-
- def doit(*args, **kwargs):
- result = proxy_call(self._autowrap, f, *args, **kwargs)
- if attr_name in self._autowrap_names and not isinstance(result, Proxy):
- return Proxy(result)
- return result
- return doit
-
- # the following are a buncha methods that the python interpeter
- # doesn't use getattr to retrieve and therefore have to be defined
- # explicitly
- def __getitem__(self, key):
- return proxy_call(self._autowrap, self._obj.__getitem__, key)
-
- def __setitem__(self, key, value):
- return proxy_call(self._autowrap, self._obj.__setitem__, key, value)
-
- def __deepcopy__(self, memo=None):
- return proxy_call(self._autowrap, self._obj.__deepcopy__, memo)
-
- def __copy__(self, memo=None):
- return proxy_call(self._autowrap, self._obj.__copy__, memo)
-
- def __call__(self, *a, **kw):
- if '__call__' in self._autowrap_names:
- return Proxy(proxy_call(self._autowrap, self._obj, *a, **kw))
- else:
- return proxy_call(self._autowrap, self._obj, *a, **kw)
-
- def __enter__(self):
- return proxy_call(self._autowrap, self._obj.__enter__)
-
- def __exit__(self, *exc):
- return proxy_call(self._autowrap, self._obj.__exit__, *exc)
-
- # these don't go through a proxy call, because they're likely to
- # be called often, and are unlikely to be implemented on the
- # wrapped object in such a way that they would block
- def __eq__(self, rhs):
- return self._obj == rhs
-
- def __hash__(self):
- return self._obj.__hash__()
-
- def __repr__(self):
- return self._obj.__repr__()
-
- def __str__(self):
- return self._obj.__str__()
-
- def __len__(self):
- return len(self._obj)
-
- def __nonzero__(self):
- return bool(self._obj)
- # Python3
- __bool__ = __nonzero__
-
- def __iter__(self):
- it = iter(self._obj)
- if it == self._obj:
- return self
- else:
- return Proxy(it)
-
- def next(self):
- return proxy_call(self._autowrap, next, self._obj)
- # Python3
- __next__ = next
-
-
-def setup():
- global _rsock, _wsock, _threads, _coro, _setup_already, _rspq, _reqq
- if _setup_already:
- return
- else:
- _setup_already = True
-
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.bind(('127.0.0.1', 0))
- sock.listen(1)
- csock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- csock.connect(sock.getsockname())
- _wsock, _addr = sock.accept()
- sock.close()
- _rsock = greenio.GreenSocket(csock)
-
- _reqq = Queue(maxsize=-1)
- _rspq = Queue(maxsize=-1)
- assert _nthreads >= 0, "Can't specify negative number of threads"
- if _nthreads == 0:
- import warnings
- warnings.warn("Zero threads in tpool. All tpool.execute calls will\
- execute in main thread. Check the value of the environment \
- variable EVENTLET_THREADPOOL_SIZE.", RuntimeWarning)
- for i in six.moves.range(_nthreads):
- t = threading.Thread(target=tworker,
- name="tpool_thread_%s" % i)
- t.setDaemon(True)
- t.start()
- _threads.append(t)
-
- _coro = greenthread.spawn_n(tpool_trampoline)
-
-
-def killall():
- global _setup_already, _rspq, _rsock, _wsock
- if not _setup_already:
- return
- for thr in _threads:
- _reqq.put(None)
- for thr in _threads:
- thr.join()
- del _threads[:]
- if _coro is not None:
- greenthread.kill(_coro)
- _rsock.close()
- _wsock.close()
- _rsock = None
- _wsock = None
- _rspq = None
- _setup_already = False
-
-
-def set_num_threads(nthreads):
- global _nthreads
- _nthreads = nthreads