Add python-eventlet package to MOS 9.0 repository
[packages/trusty/python-eventlet.git] / python-eventlet / eventlet / tpool.py
1 # Copyright (c) 2007-2009, Linden Research, Inc.
2 # Copyright (c) 2007, IBM Corp.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #   http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 import atexit
17 import imp
18 import os
19 import sys
20 import traceback
21
22 import eventlet
23 from eventlet import event, greenio, greenthread, patcher, timeout
24 from eventlet.support import six
25
26 __all__ = ['execute', 'Proxy', 'killall', 'set_num_threads']
27
28
29 EXC_CLASSES = (Exception, timeout.Timeout)
30 SYS_EXCS = (GeneratorExit, KeyboardInterrupt, SystemExit)
31
32 QUIET = True
33
34 socket = patcher.original('socket')
35 threading = patcher.original('threading')
36 if six.PY2:
37     Queue_module = patcher.original('Queue')
38 if six.PY3:
39     Queue_module = patcher.original('queue')
40
41 Empty = Queue_module.Empty
42 Queue = Queue_module.Queue
43
44 _bytetosend = b' '
45 _coro = None
46 _nthreads = int(os.environ.get('EVENTLET_THREADPOOL_SIZE', 20))
47 _reqq = _rspq = None
48 _rsock = _wsock = None
49 _setup_already = False
50 _threads = []
51
52
53 def tpool_trampoline():
54     global _rspq
55     while True:
56         try:
57             _c = _rsock.recv(1)
58             assert _c
59         # FIXME: this is probably redundant since using sockets instead of pipe now
60         except ValueError:
61             break  # will be raised when pipe is closed
62         while not _rspq.empty():
63             try:
64                 (e, rv) = _rspq.get(block=False)
65                 e.send(rv)
66                 e = rv = None
67             except Empty:
68                 pass
69
70
71 def tworker():
72     global _rspq
73     while True:
74         try:
75             msg = _reqq.get()
76         except AttributeError:
77             return  # can't get anything off of a dud queue
78         if msg is None:
79             return
80         (e, meth, args, kwargs) = msg
81         rv = None
82         try:
83             rv = meth(*args, **kwargs)
84         except SYS_EXCS:
85             raise
86         except EXC_CLASSES:
87             rv = sys.exc_info()
88         # test_leakage_from_tracebacks verifies that the use of
89         # exc_info does not lead to memory leaks
90         _rspq.put((e, rv))
91         msg = meth = args = kwargs = e = rv = None
92         _wsock.sendall(_bytetosend)
93
94
95 def execute(meth, *args, **kwargs):
96     """
97     Execute *meth* in a Python thread, blocking the current coroutine/
98     greenthread until the method completes.
99
100     The primary use case for this is to wrap an object or module that is not
101     amenable to monkeypatching or any of the other tricks that Eventlet uses
102     to achieve cooperative yielding.  With tpool, you can force such objects to
103     cooperate with green threads by sticking them in native threads, at the cost
104     of some overhead.
105     """
106     setup()
107     # if already in tpool, don't recurse into the tpool
108     # also, call functions directly if we're inside an import lock, because
109     # if meth does any importing (sadly common), it will hang
110     my_thread = threading.currentThread()
111     if my_thread in _threads or imp.lock_held() or _nthreads == 0:
112         return meth(*args, **kwargs)
113
114     e = event.Event()
115     _reqq.put((e, meth, args, kwargs))
116
117     rv = e.wait()
118     if isinstance(rv, tuple) \
119             and len(rv) == 3 \
120             and isinstance(rv[1], EXC_CLASSES):
121         (c, e, tb) = rv
122         if not QUIET:
123             traceback.print_exception(c, e, tb)
124             traceback.print_stack()
125         six.reraise(c, e, tb)
126     return rv
127
128
129 def proxy_call(autowrap, f, *args, **kwargs):
130     """
131     Call a function *f* and returns the value.  If the type of the return value
132     is in the *autowrap* collection, then it is wrapped in a :class:`Proxy`
133     object before return.
134
135     Normally *f* will be called in the threadpool with :func:`execute`; if the
136     keyword argument "nonblocking" is set to ``True``, it will simply be
137     executed directly.  This is useful if you have an object which has methods
138     that don't need to be called in a separate thread, but which return objects
139     that should be Proxy wrapped.
140     """
141     if kwargs.pop('nonblocking', False):
142         rv = f(*args, **kwargs)
143     else:
144         rv = execute(f, *args, **kwargs)
145     if isinstance(rv, autowrap):
146         return Proxy(rv, autowrap)
147     else:
148         return rv
149
150
151 class Proxy(object):
152     """
153     a simple proxy-wrapper of any object that comes with a
154     methods-only interface, in order to forward every method
155     invocation onto a thread in the native-thread pool.  A key
156     restriction is that the object's methods should not switch
157     greenlets or use Eventlet primitives, since they are in a
158     different thread from the main hub, and therefore might behave
159     unexpectedly.  This is for running native-threaded code
160     only.
161
162     It's common to want to have some of the attributes or return
163     values also wrapped in Proxy objects (for example, database
164     connection objects produce cursor objects which also should be
165     wrapped in Proxy objects to remain nonblocking).  *autowrap*, if
166     supplied, is a collection of types; if an attribute or return
167     value matches one of those types (via isinstance), it will be
168     wrapped in a Proxy.  *autowrap_names* is a collection
169     of strings, which represent the names of attributes that should be
170     wrapped in Proxy objects when accessed.
171     """
172
173     def __init__(self, obj, autowrap=(), autowrap_names=()):
174         self._obj = obj
175         self._autowrap = autowrap
176         self._autowrap_names = autowrap_names
177
178     def __getattr__(self, attr_name):
179         f = getattr(self._obj, attr_name)
180         if not hasattr(f, '__call__'):
181             if isinstance(f, self._autowrap) or attr_name in self._autowrap_names:
182                 return Proxy(f, self._autowrap)
183             return f
184
185         def doit(*args, **kwargs):
186             result = proxy_call(self._autowrap, f, *args, **kwargs)
187             if attr_name in self._autowrap_names and not isinstance(result, Proxy):
188                 return Proxy(result)
189             return result
190         return doit
191
192     # the following are a buncha methods that the python interpeter
193     # doesn't use getattr to retrieve and therefore have to be defined
194     # explicitly
195     def __getitem__(self, key):
196         return proxy_call(self._autowrap, self._obj.__getitem__, key)
197
198     def __setitem__(self, key, value):
199         return proxy_call(self._autowrap, self._obj.__setitem__, key, value)
200
201     def __deepcopy__(self, memo=None):
202         return proxy_call(self._autowrap, self._obj.__deepcopy__, memo)
203
204     def __copy__(self, memo=None):
205         return proxy_call(self._autowrap, self._obj.__copy__, memo)
206
207     def __call__(self, *a, **kw):
208         if '__call__' in self._autowrap_names:
209             return Proxy(proxy_call(self._autowrap, self._obj, *a, **kw))
210         else:
211             return proxy_call(self._autowrap, self._obj, *a, **kw)
212
213     def __enter__(self):
214         return proxy_call(self._autowrap, self._obj.__enter__)
215
216     def __exit__(self, *exc):
217         return proxy_call(self._autowrap, self._obj.__exit__, *exc)
218
219     # these don't go through a proxy call, because they're likely to
220     # be called often, and are unlikely to be implemented on the
221     # wrapped object in such a way that they would block
222     def __eq__(self, rhs):
223         return self._obj == rhs
224
225     def __hash__(self):
226         return self._obj.__hash__()
227
228     def __repr__(self):
229         return self._obj.__repr__()
230
231     def __str__(self):
232         return self._obj.__str__()
233
234     def __len__(self):
235         return len(self._obj)
236
237     def __nonzero__(self):
238         return bool(self._obj)
239     # Python3
240     __bool__ = __nonzero__
241
242     def __iter__(self):
243         it = iter(self._obj)
244         if it == self._obj:
245             return self
246         else:
247             return Proxy(it)
248
249     def next(self):
250         return proxy_call(self._autowrap, next, self._obj)
251     # Python3
252     __next__ = next
253
254
255 def setup():
256     global _rsock, _wsock, _coro, _setup_already, _rspq, _reqq
257     if _setup_already:
258         return
259     else:
260         _setup_already = True
261
262     assert _nthreads >= 0, "Can't specify negative number of threads"
263     if _nthreads == 0:
264         import warnings
265         warnings.warn("Zero threads in tpool.  All tpool.execute calls will\
266             execute in main thread.  Check the value of the environment \
267             variable EVENTLET_THREADPOOL_SIZE.", RuntimeWarning)
268     _reqq = Queue(maxsize=-1)
269     _rspq = Queue(maxsize=-1)
270
271     # connected socket pair
272     sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
273     sock.bind(('127.0.0.1', 0))
274     sock.listen(1)
275     csock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
276     csock.connect(sock.getsockname())
277     csock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
278     _wsock, _addr = sock.accept()
279     _wsock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
280     sock.close()
281     _rsock = greenio.GreenSocket(csock)
282
283     for i in six.moves.range(_nthreads):
284         t = threading.Thread(target=tworker,
285                              name="tpool_thread_%s" % i)
286         t.setDaemon(True)
287         t.start()
288         _threads.append(t)
289
290     _coro = greenthread.spawn_n(tpool_trampoline)
291     # This yield fixes subtle error with GreenSocket.__del__
292     eventlet.sleep(0)
293
294
295 # Avoid ResourceWarning unclosed socket on Python3.2+
296 @atexit.register
297 def killall():
298     global _setup_already, _rspq, _rsock, _wsock
299     if not _setup_already:
300         return
301
302     # This yield fixes freeze in some scenarios
303     eventlet.sleep(0)
304
305     for thr in _threads:
306         _reqq.put(None)
307     for thr in _threads:
308         thr.join()
309     del _threads[:]
310
311     # return any remaining results
312     while (_rspq is not None) and not _rspq.empty():
313         try:
314             (e, rv) = _rspq.get(block=False)
315             e.send(rv)
316             e = rv = None
317         except Empty:
318             pass
319
320     if _coro is not None:
321         greenthread.kill(_coro)
322     if _rsock is not None:
323         _rsock.close()
324         _rsock = None
325     if _wsock is not None:
326         _wsock.close()
327         _wsock = None
328     _rspq = None
329     _setup_already = False
330
331
332 def set_num_threads(nthreads):
333     global _nthreads
334     _nthreads = nthreads