1 # Copyright (c) 2007-2009, Linden Research, Inc.
2 # Copyright (c) 2007, IBM Corp.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
21 from eventlet import event, greenio, greenthread, patcher, timeout
22 from eventlet.support import six
24 __all__ = ['execute', 'Proxy', 'killall', 'set_num_threads']
27 EXC_CLASSES = (Exception, timeout.Timeout)
28 SYS_EXCS = (GeneratorExit, KeyboardInterrupt, SystemExit)
32 socket = patcher.original('socket')
33 threading = patcher.original('threading')
35 Queue_module = patcher.original('Queue')
37 Queue_module = patcher.original('queue')
39 Empty = Queue_module.Empty
40 Queue = Queue_module.Queue
42 _bytetosend = ' '.encode()
44 _nthreads = int(os.environ.get('EVENTLET_THREADPOOL_SIZE', 20))
46 _rsock = _wsock = None
47 _setup_already = False
51 def tpool_trampoline():
58 break # will be raised when pipe is closed
59 while not _rspq.empty():
61 (e, rv) = _rspq.get(block=False)
73 except AttributeError:
74 return # can't get anything off of a dud queue
77 (e, meth, args, kwargs) = msg
80 rv = meth(*args, **kwargs)
85 # test_leakage_from_tracebacks verifies that the use of
86 # exc_info does not lead to memory leaks
88 msg = meth = args = kwargs = e = rv = None
89 _wsock.sendall(_bytetosend)
92 def execute(meth, *args, **kwargs):
94 Execute *meth* in a Python thread, blocking the current coroutine/
95 greenthread until the method completes.
97 The primary use case for this is to wrap an object or module that is not
98 amenable to monkeypatching or any of the other tricks that Eventlet uses
99 to achieve cooperative yielding. With tpool, you can force such objects to
100 cooperate with green threads by sticking them in native threads, at the cost
104 # if already in tpool, don't recurse into the tpool
105 # also, call functions directly if we're inside an import lock, because
106 # if meth does any importing (sadly common), it will hang
107 my_thread = threading.currentThread()
108 if my_thread in _threads or imp.lock_held() or _nthreads == 0:
109 return meth(*args, **kwargs)
112 _reqq.put((e, meth, args, kwargs))
115 if isinstance(rv, tuple) \
117 and isinstance(rv[1], EXC_CLASSES):
120 traceback.print_exception(c, e, tb)
121 traceback.print_stack()
122 six.reraise(c, e, tb)
126 def proxy_call(autowrap, f, *args, **kwargs):
128 Call a function *f* and returns the value. If the type of the return value
129 is in the *autowrap* collection, then it is wrapped in a :class:`Proxy`
130 object before return.
132 Normally *f* will be called in the threadpool with :func:`execute`; if the
133 keyword argument "nonblocking" is set to ``True``, it will simply be
134 executed directly. This is useful if you have an object which has methods
135 that don't need to be called in a separate thread, but which return objects
136 that should be Proxy wrapped.
138 if kwargs.pop('nonblocking', False):
139 rv = f(*args, **kwargs)
141 rv = execute(f, *args, **kwargs)
142 if isinstance(rv, autowrap):
143 return Proxy(rv, autowrap)
150 a simple proxy-wrapper of any object that comes with a
151 methods-only interface, in order to forward every method
152 invocation onto a thread in the native-thread pool. A key
153 restriction is that the object's methods should not switch
154 greenlets or use Eventlet primitives, since they are in a
155 different thread from the main hub, and therefore might behave
156 unexpectedly. This is for running native-threaded code
159 It's common to want to have some of the attributes or return
160 values also wrapped in Proxy objects (for example, database
161 connection objects produce cursor objects which also should be
162 wrapped in Proxy objects to remain nonblocking). *autowrap*, if
163 supplied, is a collection of types; if an attribute or return
164 value matches one of those types (via isinstance), it will be
165 wrapped in a Proxy. *autowrap_names* is a collection
166 of strings, which represent the names of attributes that should be
167 wrapped in Proxy objects when accessed.
170 def __init__(self, obj, autowrap=(), autowrap_names=()):
172 self._autowrap = autowrap
173 self._autowrap_names = autowrap_names
175 def __getattr__(self, attr_name):
176 f = getattr(self._obj, attr_name)
177 if not hasattr(f, '__call__'):
178 if isinstance(f, self._autowrap) or attr_name in self._autowrap_names:
179 return Proxy(f, self._autowrap)
182 def doit(*args, **kwargs):
183 result = proxy_call(self._autowrap, f, *args, **kwargs)
184 if attr_name in self._autowrap_names and not isinstance(result, Proxy):
189 # the following are a buncha methods that the python interpeter
190 # doesn't use getattr to retrieve and therefore have to be defined
192 def __getitem__(self, key):
193 return proxy_call(self._autowrap, self._obj.__getitem__, key)
195 def __setitem__(self, key, value):
196 return proxy_call(self._autowrap, self._obj.__setitem__, key, value)
198 def __deepcopy__(self, memo=None):
199 return proxy_call(self._autowrap, self._obj.__deepcopy__, memo)
201 def __copy__(self, memo=None):
202 return proxy_call(self._autowrap, self._obj.__copy__, memo)
204 def __call__(self, *a, **kw):
205 if '__call__' in self._autowrap_names:
206 return Proxy(proxy_call(self._autowrap, self._obj, *a, **kw))
208 return proxy_call(self._autowrap, self._obj, *a, **kw)
211 return proxy_call(self._autowrap, self._obj.__enter__)
213 def __exit__(self, *exc):
214 return proxy_call(self._autowrap, self._obj.__exit__, *exc)
216 # these don't go through a proxy call, because they're likely to
217 # be called often, and are unlikely to be implemented on the
218 # wrapped object in such a way that they would block
219 def __eq__(self, rhs):
220 return self._obj == rhs
223 return self._obj.__hash__()
226 return self._obj.__repr__()
229 return self._obj.__str__()
232 return len(self._obj)
234 def __nonzero__(self):
235 return bool(self._obj)
237 __bool__ = __nonzero__
247 return proxy_call(self._autowrap, next, self._obj)
253 global _rsock, _wsock, _threads, _coro, _setup_already, _rspq, _reqq
257 _setup_already = True
259 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
260 sock.bind(('127.0.0.1', 0))
262 csock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
263 csock.connect(sock.getsockname())
264 _wsock, _addr = sock.accept()
266 _rsock = greenio.GreenSocket(csock)
268 _reqq = Queue(maxsize=-1)
269 _rspq = Queue(maxsize=-1)
270 assert _nthreads >= 0, "Can't specify negative number of threads"
273 warnings.warn("Zero threads in tpool. All tpool.execute calls will\
274 execute in main thread. Check the value of the environment \
275 variable EVENTLET_THREADPOOL_SIZE.", RuntimeWarning)
276 for i in six.moves.range(_nthreads):
277 t = threading.Thread(target=tworker,
278 name="tpool_thread_%s" % i)
283 _coro = greenthread.spawn_n(tpool_trampoline)
287 global _setup_already, _rspq, _rsock, _wsock
288 if not _setup_already:
296 # return any remaining results
297 while not _rspq.empty():
299 (e, rv) = _rspq.get(block=False)
305 if _coro is not None:
306 greenthread.kill(_coro)
312 _setup_already = False
315 def set_num_threads(nthreads):