1 # Copyright (c) 2007-2009, Linden Research, Inc.
2 # Copyright (c) 2007, IBM Corp.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
23 from eventlet import event, greenio, greenthread, patcher, timeout
24 from eventlet.support import six
26 __all__ = ['execute', 'Proxy', 'killall', 'set_num_threads']
29 EXC_CLASSES = (Exception, timeout.Timeout)
30 SYS_EXCS = (GeneratorExit, KeyboardInterrupt, SystemExit)
34 socket = patcher.original('socket')
35 threading = patcher.original('threading')
37 Queue_module = patcher.original('Queue')
39 Queue_module = patcher.original('queue')
41 Empty = Queue_module.Empty
42 Queue = Queue_module.Queue
46 _nthreads = int(os.environ.get('EVENTLET_THREADPOOL_SIZE', 20))
48 _rsock = _wsock = None
49 _setup_already = False
53 def tpool_trampoline():
59 # FIXME: this is probably redundant since using sockets instead of pipe now
61 break # will be raised when pipe is closed
62 while not _rspq.empty():
64 (e, rv) = _rspq.get(block=False)
76 except AttributeError:
77 return # can't get anything off of a dud queue
80 (e, meth, args, kwargs) = msg
83 rv = meth(*args, **kwargs)
88 # test_leakage_from_tracebacks verifies that the use of
89 # exc_info does not lead to memory leaks
91 msg = meth = args = kwargs = e = rv = None
92 _wsock.sendall(_bytetosend)
95 def execute(meth, *args, **kwargs):
97 Execute *meth* in a Python thread, blocking the current coroutine/
98 greenthread until the method completes.
100 The primary use case for this is to wrap an object or module that is not
101 amenable to monkeypatching or any of the other tricks that Eventlet uses
102 to achieve cooperative yielding. With tpool, you can force such objects to
103 cooperate with green threads by sticking them in native threads, at the cost
107 # if already in tpool, don't recurse into the tpool
108 # also, call functions directly if we're inside an import lock, because
109 # if meth does any importing (sadly common), it will hang
110 my_thread = threading.currentThread()
111 if my_thread in _threads or imp.lock_held() or _nthreads == 0:
112 return meth(*args, **kwargs)
115 _reqq.put((e, meth, args, kwargs))
118 if isinstance(rv, tuple) \
120 and isinstance(rv[1], EXC_CLASSES):
123 traceback.print_exception(c, e, tb)
124 traceback.print_stack()
125 six.reraise(c, e, tb)
129 def proxy_call(autowrap, f, *args, **kwargs):
131 Call a function *f* and returns the value. If the type of the return value
132 is in the *autowrap* collection, then it is wrapped in a :class:`Proxy`
133 object before return.
135 Normally *f* will be called in the threadpool with :func:`execute`; if the
136 keyword argument "nonblocking" is set to ``True``, it will simply be
137 executed directly. This is useful if you have an object which has methods
138 that don't need to be called in a separate thread, but which return objects
139 that should be Proxy wrapped.
141 if kwargs.pop('nonblocking', False):
142 rv = f(*args, **kwargs)
144 rv = execute(f, *args, **kwargs)
145 if isinstance(rv, autowrap):
146 return Proxy(rv, autowrap)
153 a simple proxy-wrapper of any object that comes with a
154 methods-only interface, in order to forward every method
155 invocation onto a thread in the native-thread pool. A key
156 restriction is that the object's methods should not switch
157 greenlets or use Eventlet primitives, since they are in a
158 different thread from the main hub, and therefore might behave
159 unexpectedly. This is for running native-threaded code
162 It's common to want to have some of the attributes or return
163 values also wrapped in Proxy objects (for example, database
164 connection objects produce cursor objects which also should be
165 wrapped in Proxy objects to remain nonblocking). *autowrap*, if
166 supplied, is a collection of types; if an attribute or return
167 value matches one of those types (via isinstance), it will be
168 wrapped in a Proxy. *autowrap_names* is a collection
169 of strings, which represent the names of attributes that should be
170 wrapped in Proxy objects when accessed.
173 def __init__(self, obj, autowrap=(), autowrap_names=()):
175 self._autowrap = autowrap
176 self._autowrap_names = autowrap_names
178 def __getattr__(self, attr_name):
179 f = getattr(self._obj, attr_name)
180 if not hasattr(f, '__call__'):
181 if isinstance(f, self._autowrap) or attr_name in self._autowrap_names:
182 return Proxy(f, self._autowrap)
185 def doit(*args, **kwargs):
186 result = proxy_call(self._autowrap, f, *args, **kwargs)
187 if attr_name in self._autowrap_names and not isinstance(result, Proxy):
192 # the following are a buncha methods that the python interpeter
193 # doesn't use getattr to retrieve and therefore have to be defined
195 def __getitem__(self, key):
196 return proxy_call(self._autowrap, self._obj.__getitem__, key)
198 def __setitem__(self, key, value):
199 return proxy_call(self._autowrap, self._obj.__setitem__, key, value)
201 def __deepcopy__(self, memo=None):
202 return proxy_call(self._autowrap, self._obj.__deepcopy__, memo)
204 def __copy__(self, memo=None):
205 return proxy_call(self._autowrap, self._obj.__copy__, memo)
207 def __call__(self, *a, **kw):
208 if '__call__' in self._autowrap_names:
209 return Proxy(proxy_call(self._autowrap, self._obj, *a, **kw))
211 return proxy_call(self._autowrap, self._obj, *a, **kw)
214 return proxy_call(self._autowrap, self._obj.__enter__)
216 def __exit__(self, *exc):
217 return proxy_call(self._autowrap, self._obj.__exit__, *exc)
219 # these don't go through a proxy call, because they're likely to
220 # be called often, and are unlikely to be implemented on the
221 # wrapped object in such a way that they would block
222 def __eq__(self, rhs):
223 return self._obj == rhs
226 return self._obj.__hash__()
229 return self._obj.__repr__()
232 return self._obj.__str__()
235 return len(self._obj)
237 def __nonzero__(self):
238 return bool(self._obj)
240 __bool__ = __nonzero__
250 return proxy_call(self._autowrap, next, self._obj)
256 global _rsock, _wsock, _coro, _setup_already, _rspq, _reqq
260 _setup_already = True
262 assert _nthreads >= 0, "Can't specify negative number of threads"
265 warnings.warn("Zero threads in tpool. All tpool.execute calls will\
266 execute in main thread. Check the value of the environment \
267 variable EVENTLET_THREADPOOL_SIZE.", RuntimeWarning)
268 _reqq = Queue(maxsize=-1)
269 _rspq = Queue(maxsize=-1)
271 # connected socket pair
272 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
273 sock.bind(('127.0.0.1', 0))
275 csock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
276 csock.connect(sock.getsockname())
277 csock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
278 _wsock, _addr = sock.accept()
279 _wsock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
281 _rsock = greenio.GreenSocket(csock)
283 for i in six.moves.range(_nthreads):
284 t = threading.Thread(target=tworker,
285 name="tpool_thread_%s" % i)
290 _coro = greenthread.spawn_n(tpool_trampoline)
291 # This yield fixes subtle error with GreenSocket.__del__
295 # Avoid ResourceWarning unclosed socket on Python3.2+
298 global _setup_already, _rspq, _rsock, _wsock
299 if not _setup_already:
302 # This yield fixes freeze in some scenarios
311 # return any remaining results
312 while (_rspq is not None) and not _rspq.empty():
314 (e, rv) = _rspq.get(block=False)
320 if _coro is not None:
321 greenthread.kill(_coro)
322 if _rsock is not None:
325 if _wsock is not None:
329 _setup_already = False
332 def set_num_threads(nthreads):