Add python-eventlet 0.16.1
[packages/trusty/python-eventlet.git] / eventlet / eventlet / tpool.py
diff --git a/eventlet/eventlet/tpool.py b/eventlet/eventlet/tpool.py
new file mode 100644 (file)
index 0000000..e7f0db1
--- /dev/null
@@ -0,0 +1,307 @@
+# Copyright (c) 2007-2009, Linden Research, Inc.
+# Copyright (c) 2007, IBM Corp.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import imp
+import os
+import sys
+import traceback
+
+from eventlet import event, greenio, greenthread, patcher, timeout
+from eventlet.support import six
+
+__all__ = ['execute', 'Proxy', 'killall', 'set_num_threads']
+
+
+EXC_CLASSES = (Exception, timeout.Timeout)
+SYS_EXCS = (GeneratorExit, KeyboardInterrupt, SystemExit)
+
+QUIET = True
+
+socket = patcher.original('socket')
+threading = patcher.original('threading')
+if six.PY2:
+    Queue_module = patcher.original('Queue')
+if six.PY3:
+    Queue_module = patcher.original('queue')
+
+Empty = Queue_module.Empty
+Queue = Queue_module.Queue
+
+_bytetosend = ' '.encode()
+_coro = None
+_nthreads = int(os.environ.get('EVENTLET_THREADPOOL_SIZE', 20))
+_reqq = _rspq = None
+_rsock = _wsock = None
+_setup_already = False
+_threads = []
+
+
+def tpool_trampoline():
+    global _rspq
+    while True:
+        try:
+            _c = _rsock.recv(1)
+            assert _c
+        except ValueError:
+            break  # will be raised when pipe is closed
+        while not _rspq.empty():
+            try:
+                (e, rv) = _rspq.get(block=False)
+                e.send(rv)
+                e = rv = None
+            except Empty:
+                pass
+
+
+def tworker():
+    global _rspq
+    while True:
+        try:
+            msg = _reqq.get()
+        except AttributeError:
+            return  # can't get anything off of a dud queue
+        if msg is None:
+            return
+        (e, meth, args, kwargs) = msg
+        rv = None
+        try:
+            rv = meth(*args, **kwargs)
+        except SYS_EXCS:
+            raise
+        except EXC_CLASSES:
+            rv = sys.exc_info()
+        # test_leakage_from_tracebacks verifies that the use of
+        # exc_info does not lead to memory leaks
+        _rspq.put((e, rv))
+        msg = meth = args = kwargs = e = rv = None
+        _wsock.sendall(_bytetosend)
+
+
+def execute(meth, *args, **kwargs):
+    """
+    Execute *meth* in a Python thread, blocking the current coroutine/
+    greenthread until the method completes.
+
+    The primary use case for this is to wrap an object or module that is not
+    amenable to monkeypatching or any of the other tricks that Eventlet uses
+    to achieve cooperative yielding.  With tpool, you can force such objects to
+    cooperate with green threads by sticking them in native threads, at the cost
+    of some overhead.
+    """
+    setup()
+    # if already in tpool, don't recurse into the tpool
+    # also, call functions directly if we're inside an import lock, because
+    # if meth does any importing (sadly common), it will hang
+    my_thread = threading.currentThread()
+    if my_thread in _threads or imp.lock_held() or _nthreads == 0:
+        return meth(*args, **kwargs)
+
+    e = event.Event()
+    _reqq.put((e, meth, args, kwargs))
+
+    rv = e.wait()
+    if isinstance(rv, tuple) \
+            and len(rv) == 3 \
+            and isinstance(rv[1], EXC_CLASSES):
+        (c, e, tb) = rv
+        if not QUIET:
+            traceback.print_exception(c, e, tb)
+            traceback.print_stack()
+        six.reraise(c, e, tb)
+    return rv
+
+
+def proxy_call(autowrap, f, *args, **kwargs):
+    """
+    Call a function *f* and returns the value.  If the type of the return value
+    is in the *autowrap* collection, then it is wrapped in a :class:`Proxy`
+    object before return.
+
+    Normally *f* will be called in the threadpool with :func:`execute`; if the
+    keyword argument "nonblocking" is set to ``True``, it will simply be
+    executed directly.  This is useful if you have an object which has methods
+    that don't need to be called in a separate thread, but which return objects
+    that should be Proxy wrapped.
+    """
+    if kwargs.pop('nonblocking', False):
+        rv = f(*args, **kwargs)
+    else:
+        rv = execute(f, *args, **kwargs)
+    if isinstance(rv, autowrap):
+        return Proxy(rv, autowrap)
+    else:
+        return rv
+
+
+class Proxy(object):
+    """
+    a simple proxy-wrapper of any object that comes with a
+    methods-only interface, in order to forward every method
+    invocation onto a thread in the native-thread pool.  A key
+    restriction is that the object's methods should not switch
+    greenlets or use Eventlet primitives, since they are in a
+    different thread from the main hub, and therefore might behave
+    unexpectedly.  This is for running native-threaded code
+    only.
+
+    It's common to want to have some of the attributes or return
+    values also wrapped in Proxy objects (for example, database
+    connection objects produce cursor objects which also should be
+    wrapped in Proxy objects to remain nonblocking).  *autowrap*, if
+    supplied, is a collection of types; if an attribute or return
+    value matches one of those types (via isinstance), it will be
+    wrapped in a Proxy.  *autowrap_names* is a collection
+    of strings, which represent the names of attributes that should be
+    wrapped in Proxy objects when accessed.
+    """
+
+    def __init__(self, obj, autowrap=(), autowrap_names=()):
+        self._obj = obj
+        self._autowrap = autowrap
+        self._autowrap_names = autowrap_names
+
+    def __getattr__(self, attr_name):
+        f = getattr(self._obj, attr_name)
+        if not hasattr(f, '__call__'):
+            if isinstance(f, self._autowrap) or attr_name in self._autowrap_names:
+                return Proxy(f, self._autowrap)
+            return f
+
+        def doit(*args, **kwargs):
+            result = proxy_call(self._autowrap, f, *args, **kwargs)
+            if attr_name in self._autowrap_names and not isinstance(result, Proxy):
+                return Proxy(result)
+            return result
+        return doit
+
+    # the following are a buncha methods that the python interpeter
+    # doesn't use getattr to retrieve and therefore have to be defined
+    # explicitly
+    def __getitem__(self, key):
+        return proxy_call(self._autowrap, self._obj.__getitem__, key)
+
+    def __setitem__(self, key, value):
+        return proxy_call(self._autowrap, self._obj.__setitem__, key, value)
+
+    def __deepcopy__(self, memo=None):
+        return proxy_call(self._autowrap, self._obj.__deepcopy__, memo)
+
+    def __copy__(self, memo=None):
+        return proxy_call(self._autowrap, self._obj.__copy__, memo)
+
+    def __call__(self, *a, **kw):
+        if '__call__' in self._autowrap_names:
+            return Proxy(proxy_call(self._autowrap, self._obj, *a, **kw))
+        else:
+            return proxy_call(self._autowrap, self._obj, *a, **kw)
+
+    def __enter__(self):
+        return proxy_call(self._autowrap, self._obj.__enter__)
+
+    def __exit__(self, *exc):
+        return proxy_call(self._autowrap, self._obj.__exit__, *exc)
+
+    # these don't go through a proxy call, because they're likely to
+    # be called often, and are unlikely to be implemented on the
+    # wrapped object in such a way that they would block
+    def __eq__(self, rhs):
+        return self._obj == rhs
+
+    def __hash__(self):
+        return self._obj.__hash__()
+
+    def __repr__(self):
+        return self._obj.__repr__()
+
+    def __str__(self):
+        return self._obj.__str__()
+
+    def __len__(self):
+        return len(self._obj)
+
+    def __nonzero__(self):
+        return bool(self._obj)
+    # Python3
+    __bool__ = __nonzero__
+
+    def __iter__(self):
+        it = iter(self._obj)
+        if it == self._obj:
+            return self
+        else:
+            return Proxy(it)
+
+    def next(self):
+        return proxy_call(self._autowrap, next, self._obj)
+    # Python3
+    __next__ = next
+
+
+def setup():
+    global _rsock, _wsock, _threads, _coro, _setup_already, _rspq, _reqq
+    if _setup_already:
+        return
+    else:
+        _setup_already = True
+
+    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+    sock.bind(('127.0.0.1', 0))
+    sock.listen(1)
+    csock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+    csock.connect(sock.getsockname())
+    _wsock, _addr = sock.accept()
+    sock.close()
+    _rsock = greenio.GreenSocket(csock)
+
+    _reqq = Queue(maxsize=-1)
+    _rspq = Queue(maxsize=-1)
+    assert _nthreads >= 0, "Can't specify negative number of threads"
+    if _nthreads == 0:
+        import warnings
+        warnings.warn("Zero threads in tpool.  All tpool.execute calls will\
+            execute in main thread.  Check the value of the environment \
+            variable EVENTLET_THREADPOOL_SIZE.", RuntimeWarning)
+    for i in six.moves.range(_nthreads):
+        t = threading.Thread(target=tworker,
+                             name="tpool_thread_%s" % i)
+        t.setDaemon(True)
+        t.start()
+        _threads.append(t)
+
+    _coro = greenthread.spawn_n(tpool_trampoline)
+
+
+def killall():
+    global _setup_already, _rspq, _rsock, _wsock
+    if not _setup_already:
+        return
+    for thr in _threads:
+        _reqq.put(None)
+    for thr in _threads:
+        thr.join()
+    del _threads[:]
+    if _coro is not None:
+        greenthread.kill(_coro)
+    _rsock.close()
+    _wsock.close()
+    _rsock = None
+    _wsock = None
+    _rspq = None
+    _setup_already = False
+
+
+def set_num_threads(nthreads):
+    global _nthreads
+    _nthreads = nthreads