Add python-eventlet 0.16.1
[packages/trusty/python-eventlet.git] / eventlet / eventlet / tpool.py
1 # Copyright (c) 2007-2009, Linden Research, Inc.
2 # Copyright (c) 2007, IBM Corp.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #   http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 import imp
17 import os
18 import sys
19 import traceback
20
21 from eventlet import event, greenio, greenthread, patcher, timeout
22 from eventlet.support import six
23
24 __all__ = ['execute', 'Proxy', 'killall', 'set_num_threads']
25
26
27 EXC_CLASSES = (Exception, timeout.Timeout)
28 SYS_EXCS = (GeneratorExit, KeyboardInterrupt, SystemExit)
29
30 QUIET = True
31
32 socket = patcher.original('socket')
33 threading = patcher.original('threading')
34 if six.PY2:
35     Queue_module = patcher.original('Queue')
36 if six.PY3:
37     Queue_module = patcher.original('queue')
38
39 Empty = Queue_module.Empty
40 Queue = Queue_module.Queue
41
42 _bytetosend = ' '.encode()
43 _coro = None
44 _nthreads = int(os.environ.get('EVENTLET_THREADPOOL_SIZE', 20))
45 _reqq = _rspq = None
46 _rsock = _wsock = None
47 _setup_already = False
48 _threads = []
49
50
51 def tpool_trampoline():
52     global _rspq
53     while True:
54         try:
55             _c = _rsock.recv(1)
56             assert _c
57         except ValueError:
58             break  # will be raised when pipe is closed
59         while not _rspq.empty():
60             try:
61                 (e, rv) = _rspq.get(block=False)
62                 e.send(rv)
63                 e = rv = None
64             except Empty:
65                 pass
66
67
68 def tworker():
69     global _rspq
70     while True:
71         try:
72             msg = _reqq.get()
73         except AttributeError:
74             return  # can't get anything off of a dud queue
75         if msg is None:
76             return
77         (e, meth, args, kwargs) = msg
78         rv = None
79         try:
80             rv = meth(*args, **kwargs)
81         except SYS_EXCS:
82             raise
83         except EXC_CLASSES:
84             rv = sys.exc_info()
85         # test_leakage_from_tracebacks verifies that the use of
86         # exc_info does not lead to memory leaks
87         _rspq.put((e, rv))
88         msg = meth = args = kwargs = e = rv = None
89         _wsock.sendall(_bytetosend)
90
91
92 def execute(meth, *args, **kwargs):
93     """
94     Execute *meth* in a Python thread, blocking the current coroutine/
95     greenthread until the method completes.
96
97     The primary use case for this is to wrap an object or module that is not
98     amenable to monkeypatching or any of the other tricks that Eventlet uses
99     to achieve cooperative yielding.  With tpool, you can force such objects to
100     cooperate with green threads by sticking them in native threads, at the cost
101     of some overhead.
102     """
103     setup()
104     # if already in tpool, don't recurse into the tpool
105     # also, call functions directly if we're inside an import lock, because
106     # if meth does any importing (sadly common), it will hang
107     my_thread = threading.currentThread()
108     if my_thread in _threads or imp.lock_held() or _nthreads == 0:
109         return meth(*args, **kwargs)
110
111     e = event.Event()
112     _reqq.put((e, meth, args, kwargs))
113
114     rv = e.wait()
115     if isinstance(rv, tuple) \
116             and len(rv) == 3 \
117             and isinstance(rv[1], EXC_CLASSES):
118         (c, e, tb) = rv
119         if not QUIET:
120             traceback.print_exception(c, e, tb)
121             traceback.print_stack()
122         six.reraise(c, e, tb)
123     return rv
124
125
126 def proxy_call(autowrap, f, *args, **kwargs):
127     """
128     Call a function *f* and returns the value.  If the type of the return value
129     is in the *autowrap* collection, then it is wrapped in a :class:`Proxy`
130     object before return.
131
132     Normally *f* will be called in the threadpool with :func:`execute`; if the
133     keyword argument "nonblocking" is set to ``True``, it will simply be
134     executed directly.  This is useful if you have an object which has methods
135     that don't need to be called in a separate thread, but which return objects
136     that should be Proxy wrapped.
137     """
138     if kwargs.pop('nonblocking', False):
139         rv = f(*args, **kwargs)
140     else:
141         rv = execute(f, *args, **kwargs)
142     if isinstance(rv, autowrap):
143         return Proxy(rv, autowrap)
144     else:
145         return rv
146
147
148 class Proxy(object):
149     """
150     a simple proxy-wrapper of any object that comes with a
151     methods-only interface, in order to forward every method
152     invocation onto a thread in the native-thread pool.  A key
153     restriction is that the object's methods should not switch
154     greenlets or use Eventlet primitives, since they are in a
155     different thread from the main hub, and therefore might behave
156     unexpectedly.  This is for running native-threaded code
157     only.
158
159     It's common to want to have some of the attributes or return
160     values also wrapped in Proxy objects (for example, database
161     connection objects produce cursor objects which also should be
162     wrapped in Proxy objects to remain nonblocking).  *autowrap*, if
163     supplied, is a collection of types; if an attribute or return
164     value matches one of those types (via isinstance), it will be
165     wrapped in a Proxy.  *autowrap_names* is a collection
166     of strings, which represent the names of attributes that should be
167     wrapped in Proxy objects when accessed.
168     """
169
170     def __init__(self, obj, autowrap=(), autowrap_names=()):
171         self._obj = obj
172         self._autowrap = autowrap
173         self._autowrap_names = autowrap_names
174
175     def __getattr__(self, attr_name):
176         f = getattr(self._obj, attr_name)
177         if not hasattr(f, '__call__'):
178             if isinstance(f, self._autowrap) or attr_name in self._autowrap_names:
179                 return Proxy(f, self._autowrap)
180             return f
181
182         def doit(*args, **kwargs):
183             result = proxy_call(self._autowrap, f, *args, **kwargs)
184             if attr_name in self._autowrap_names and not isinstance(result, Proxy):
185                 return Proxy(result)
186             return result
187         return doit
188
189     # the following are a buncha methods that the python interpeter
190     # doesn't use getattr to retrieve and therefore have to be defined
191     # explicitly
192     def __getitem__(self, key):
193         return proxy_call(self._autowrap, self._obj.__getitem__, key)
194
195     def __setitem__(self, key, value):
196         return proxy_call(self._autowrap, self._obj.__setitem__, key, value)
197
198     def __deepcopy__(self, memo=None):
199         return proxy_call(self._autowrap, self._obj.__deepcopy__, memo)
200
201     def __copy__(self, memo=None):
202         return proxy_call(self._autowrap, self._obj.__copy__, memo)
203
204     def __call__(self, *a, **kw):
205         if '__call__' in self._autowrap_names:
206             return Proxy(proxy_call(self._autowrap, self._obj, *a, **kw))
207         else:
208             return proxy_call(self._autowrap, self._obj, *a, **kw)
209
210     def __enter__(self):
211         return proxy_call(self._autowrap, self._obj.__enter__)
212
213     def __exit__(self, *exc):
214         return proxy_call(self._autowrap, self._obj.__exit__, *exc)
215
216     # these don't go through a proxy call, because they're likely to
217     # be called often, and are unlikely to be implemented on the
218     # wrapped object in such a way that they would block
219     def __eq__(self, rhs):
220         return self._obj == rhs
221
222     def __hash__(self):
223         return self._obj.__hash__()
224
225     def __repr__(self):
226         return self._obj.__repr__()
227
228     def __str__(self):
229         return self._obj.__str__()
230
231     def __len__(self):
232         return len(self._obj)
233
234     def __nonzero__(self):
235         return bool(self._obj)
236     # Python3
237     __bool__ = __nonzero__
238
239     def __iter__(self):
240         it = iter(self._obj)
241         if it == self._obj:
242             return self
243         else:
244             return Proxy(it)
245
246     def next(self):
247         return proxy_call(self._autowrap, next, self._obj)
248     # Python3
249     __next__ = next
250
251
252 def setup():
253     global _rsock, _wsock, _threads, _coro, _setup_already, _rspq, _reqq
254     if _setup_already:
255         return
256     else:
257         _setup_already = True
258
259     sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
260     sock.bind(('127.0.0.1', 0))
261     sock.listen(1)
262     csock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
263     csock.connect(sock.getsockname())
264     _wsock, _addr = sock.accept()
265     sock.close()
266     _rsock = greenio.GreenSocket(csock)
267
268     _reqq = Queue(maxsize=-1)
269     _rspq = Queue(maxsize=-1)
270     assert _nthreads >= 0, "Can't specify negative number of threads"
271     if _nthreads == 0:
272         import warnings
273         warnings.warn("Zero threads in tpool.  All tpool.execute calls will\
274             execute in main thread.  Check the value of the environment \
275             variable EVENTLET_THREADPOOL_SIZE.", RuntimeWarning)
276     for i in six.moves.range(_nthreads):
277         t = threading.Thread(target=tworker,
278                              name="tpool_thread_%s" % i)
279         t.setDaemon(True)
280         t.start()
281         _threads.append(t)
282
283     _coro = greenthread.spawn_n(tpool_trampoline)
284
285
286 def killall():
287     global _setup_already, _rspq, _rsock, _wsock
288     if not _setup_already:
289         return
290     for thr in _threads:
291         _reqq.put(None)
292     for thr in _threads:
293         thr.join()
294     del _threads[:]
295     if _coro is not None:
296         greenthread.kill(_coro)
297     _rsock.close()
298     _wsock.close()
299     _rsock = None
300     _wsock = None
301     _rspq = None
302     _setup_already = False
303
304
305 def set_num_threads(nthreads):
306     global _nthreads
307     _nthreads = nthreads