Add python-eventlet 0.16.1
[packages/trusty/python-eventlet.git] / eventlet / eventlet / green / zmq.py
diff --git a/eventlet/eventlet/green/zmq.py b/eventlet/eventlet/green/zmq.py
new file mode 100644 (file)
index 0000000..b08eabd
--- /dev/null
@@ -0,0 +1,359 @@
+# -*- coding: utf-8 -*-
+"""The :mod:`zmq` module wraps the :class:`Socket` and :class:`Context`
+found in :mod:`pyzmq <zmq>` to be non blocking
+"""
+
+from __future__ import with_statement
+
+__zmq__ = __import__('zmq')
+from eventlet import hubs
+from eventlet.patcher import slurp_properties
+from eventlet.support import greenlets as greenlet
+
+__patched__ = ['Context', 'Socket']
+slurp_properties(__zmq__, globals(), ignore=__patched__)
+
+from collections import deque
+
+try:
+    # alias XREQ/XREP to DEALER/ROUTER if available
+    if not hasattr(__zmq__, 'XREQ'):
+        XREQ = DEALER
+    if not hasattr(__zmq__, 'XREP'):
+        XREP = ROUTER
+except NameError:
+    pass
+
+
+class LockReleaseError(Exception):
+    pass
+
+
+class _QueueLock(object):
+    """A Lock that can be acquired by at most one thread. Any other
+    thread calling acquire will be blocked in a queue. When release
+    is called, the threads are awoken in the order they blocked,
+    one at a time. This lock can be required recursively by the same
+    thread."""
+
+    def __init__(self):
+        self._waiters = deque()
+        self._count = 0
+        self._holder = None
+        self._hub = hubs.get_hub()
+
+    def __nonzero__(self):
+        return bool(self._count)
+
+    __bool__ = __nonzero__
+
+    def __enter__(self):
+        self.acquire()
+
+    def __exit__(self, type, value, traceback):
+        self.release()
+
+    def acquire(self):
+        current = greenlet.getcurrent()
+        if (self._waiters or self._count > 0) and self._holder is not current:
+            # block until lock is free
+            self._waiters.append(current)
+            self._hub.switch()
+            w = self._waiters.popleft()
+
+            assert w is current, 'Waiting threads woken out of order'
+            assert self._count == 0, 'After waking a thread, the lock must be unacquired'
+
+        self._holder = current
+        self._count += 1
+
+    def release(self):
+        if self._count <= 0:
+            raise LockReleaseError("Cannot release unacquired lock")
+
+        self._count -= 1
+        if self._count == 0:
+            self._holder = None
+            if self._waiters:
+                # wake next
+                self._hub.schedule_call_global(0, self._waiters[0].switch)
+
+
+class _BlockedThread(object):
+    """Is either empty, or represents a single blocked thread that
+    blocked itself by calling the block() method. The thread can be
+    awoken by calling wake(). Wake() can be called multiple times and
+    all but the first call will have no effect."""
+
+    def __init__(self):
+        self._blocked_thread = None
+        self._wakeupper = None
+        self._hub = hubs.get_hub()
+
+    def __nonzero__(self):
+        return self._blocked_thread is not None
+
+    __bool__ = __nonzero__
+
+    def block(self):
+        if self._blocked_thread is not None:
+            raise Exception("Cannot block more than one thread on one BlockedThread")
+        self._blocked_thread = greenlet.getcurrent()
+
+        try:
+            self._hub.switch()
+        finally:
+            self._blocked_thread = None
+            # cleanup the wakeup task
+            if self._wakeupper is not None:
+                # Important to cancel the wakeup task so it doesn't
+                # spuriously wake this greenthread later on.
+                self._wakeupper.cancel()
+                self._wakeupper = None
+
+    def wake(self):
+        """Schedules the blocked thread to be awoken and return
+        True. If wake has already been called or if there is no
+        blocked thread, then this call has no effect and returns
+        False."""
+        if self._blocked_thread is not None and self._wakeupper is None:
+            self._wakeupper = self._hub.schedule_call_global(0, self._blocked_thread.switch)
+            return True
+        return False
+
+
+class Context(__zmq__.Context):
+    """Subclass of :class:`zmq.core.context.Context`
+    """
+
+    def socket(self, socket_type):
+        """Overridden method to ensure that the green version of socket is used
+
+        Behaves the same as :meth:`zmq.core.context.Context.socket`, but ensures
+        that a :class:`Socket` with all of its send and recv methods set to be
+        non-blocking is returned
+        """
+        if self.closed:
+            raise ZMQError(ENOTSUP)
+        return Socket(self, socket_type)
+
+
+def _wraps(source_fn):
+    """A decorator that copies the __name__ and __doc__ from the given
+    function
+    """
+    def wrapper(dest_fn):
+        dest_fn.__name__ = source_fn.__name__
+        dest_fn.__doc__ = source_fn.__doc__
+        return dest_fn
+    return wrapper
+
+# Implementation notes: Each socket in 0mq contains a pipe that the
+# background IO threads use to communicate with the socket. These
+# events are important because they tell the socket when it is able to
+# send and when it has messages waiting to be received. The read end
+# of the events pipe is the same FD that getsockopt(zmq.FD) returns.
+#
+# Events are read from the socket's event pipe only on the thread that
+# the 0mq context is associated with, which is the native thread the
+# greenthreads are running on, and the only operations that cause the
+# events to be read and processed are send(), recv() and
+# getsockopt(zmq.EVENTS). This means that after doing any of these
+# three operations, the ability of the socket to send or receive a
+# message without blocking may have changed, but after the events are
+# read the FD is no longer readable so the hub may not signal our
+# listener.
+#
+# If we understand that after calling send() a message might be ready
+# to be received and that after calling recv() a message might be able
+# to be sent, what should we do next? There are two approaches:
+#
+#  1. Always wake the other thread if there is one waiting. This
+#  wakeup may be spurious because the socket might not actually be
+#  ready for a send() or recv().  However, if a thread is in a
+#  tight-loop successfully calling send() or recv() then the wakeups
+#  are naturally batched and there's very little cost added to each
+#  send/recv call.
+#
+# or
+#
+#  2. Call getsockopt(zmq.EVENTS) and explicitly check if the other
+#  thread should be woken up. This avoids spurious wake-ups but may
+#  add overhead because getsockopt will cause all events to be
+#  processed, whereas send and recv throttle processing
+#  events. Admittedly, all of the events will need to be processed
+#  eventually, but it is likely faster to batch the processing.
+#
+# Which approach is better? I have no idea.
+#
+# TODO:
+# - Support MessageTrackers and make MessageTracker.wait green
+
+_Socket = __zmq__.Socket
+_Socket_recv = _Socket.recv
+_Socket_send = _Socket.send
+_Socket_send_multipart = _Socket.send_multipart
+_Socket_recv_multipart = _Socket.recv_multipart
+_Socket_getsockopt = _Socket.getsockopt
+
+
+class Socket(_Socket):
+    """Green version of :class:`zmq.core.socket.Socket
+
+    The following three methods are always overridden:
+        * send
+        * recv
+        * getsockopt
+    To ensure that the ``zmq.NOBLOCK`` flag is set and that sending or recieving
+    is deferred to the hub (using :func:`eventlet.hubs.trampoline`) if a
+    ``zmq.EAGAIN`` (retry) error is raised
+
+    For some socket types, the following methods are also overridden:
+        * send_multipart
+        * recv_multipart
+    """
+
+    def __init__(self, context, socket_type):
+        super(Socket, self).__init__(context, socket_type)
+
+        self.__dict__['_eventlet_send_event'] = _BlockedThread()
+        self.__dict__['_eventlet_recv_event'] = _BlockedThread()
+        self.__dict__['_eventlet_send_lock'] = _QueueLock()
+        self.__dict__['_eventlet_recv_lock'] = _QueueLock()
+
+        def event(fd):
+            # Some events arrived at the zmq socket. This may mean
+            # there's a message that can be read or there's space for
+            # a message to be written.
+            send_wake = self._eventlet_send_event.wake()
+            recv_wake = self._eventlet_recv_event.wake()
+            if not send_wake and not recv_wake:
+                # if no waiting send or recv thread was woken up, then
+                # force the zmq socket's events to be processed to
+                # avoid repeated wakeups
+                _Socket_getsockopt(self, EVENTS)
+
+        hub = hubs.get_hub()
+        self.__dict__['_eventlet_listener'] = hub.add(hub.READ,
+                                                      self.getsockopt(FD),
+                                                      event,
+                                                      lambda _: None,
+                                                      lambda: None)
+
+    @_wraps(_Socket.close)
+    def close(self, linger=None):
+        super(Socket, self).close(linger)
+        if self._eventlet_listener is not None:
+            hubs.get_hub().remove(self._eventlet_listener)
+            self.__dict__['_eventlet_listener'] = None
+            # wake any blocked threads
+            self._eventlet_send_event.wake()
+            self._eventlet_recv_event.wake()
+
+    @_wraps(_Socket.getsockopt)
+    def getsockopt(self, option):
+        result = _Socket_getsockopt(self, option)
+        if option == EVENTS:
+            # Getting the events causes the zmq socket to process
+            # events which may mean a msg can be sent or received. If
+            # there is a greenthread blocked and waiting for events,
+            # it will miss the edge-triggered read event, so wake it
+            # up.
+            if (result & POLLOUT):
+                self._eventlet_send_event.wake()
+            if (result & POLLIN):
+                self._eventlet_recv_event.wake()
+        return result
+
+    @_wraps(_Socket.send)
+    def send(self, msg, flags=0, copy=True, track=False):
+        """A send method that's safe to use when multiple greenthreads
+        are calling send, send_multipart, recv and recv_multipart on
+        the same socket.
+        """
+        if flags & NOBLOCK:
+            result = _Socket_send(self, msg, flags, copy, track)
+            # Instead of calling both wake methods, could call
+            # self.getsockopt(EVENTS) which would trigger wakeups if
+            # needed.
+            self._eventlet_send_event.wake()
+            self._eventlet_recv_event.wake()
+            return result
+
+        # TODO: pyzmq will copy the message buffer and create Message
+        # objects under some circumstances. We could do that work here
+        # once to avoid doing it every time the send is retried.
+        flags |= NOBLOCK
+        with self._eventlet_send_lock:
+            while True:
+                try:
+                    return _Socket_send(self, msg, flags, copy, track)
+                except ZMQError as e:
+                    if e.errno == EAGAIN:
+                        self._eventlet_send_event.block()
+                    else:
+                        raise
+                finally:
+                    # The call to send processes 0mq events and may
+                    # make the socket ready to recv. Wake the next
+                    # receiver. (Could check EVENTS for POLLIN here)
+                    self._eventlet_recv_event.wake()
+
+    @_wraps(_Socket.send_multipart)
+    def send_multipart(self, msg_parts, flags=0, copy=True, track=False):
+        """A send_multipart method that's safe to use when multiple
+        greenthreads are calling send, send_multipart, recv and
+        recv_multipart on the same socket.
+        """
+        if flags & NOBLOCK:
+            return _Socket_send_multipart(self, msg_parts, flags, copy, track)
+
+        # acquire lock here so the subsequent calls to send for the
+        # message parts after the first don't block
+        with self._eventlet_send_lock:
+            return _Socket_send_multipart(self, msg_parts, flags, copy, track)
+
+    @_wraps(_Socket.recv)
+    def recv(self, flags=0, copy=True, track=False):
+        """A recv method that's safe to use when multiple greenthreads
+        are calling send, send_multipart, recv and recv_multipart on
+        the same socket.
+        """
+        if flags & NOBLOCK:
+            msg = _Socket_recv(self, flags, copy, track)
+            # Instead of calling both wake methods, could call
+            # self.getsockopt(EVENTS) which would trigger wakeups if
+            # needed.
+            self._eventlet_send_event.wake()
+            self._eventlet_recv_event.wake()
+            return msg
+
+        flags |= NOBLOCK
+        with self._eventlet_recv_lock:
+            while True:
+                try:
+                    return _Socket_recv(self, flags, copy, track)
+                except ZMQError as e:
+                    if e.errno == EAGAIN:
+                        self._eventlet_recv_event.block()
+                    else:
+                        raise
+                finally:
+                    # The call to recv processes 0mq events and may
+                    # make the socket ready to send. Wake the next
+                    # receiver. (Could check EVENTS for POLLOUT here)
+                    self._eventlet_send_event.wake()
+
+    @_wraps(_Socket.recv_multipart)
+    def recv_multipart(self, flags=0, copy=True, track=False):
+        """A recv_multipart method that's safe to use when multiple
+        greenthreads are calling send, send_multipart, recv and
+        recv_multipart on the same socket.
+        """
+        if flags & NOBLOCK:
+            return _Socket_recv_multipart(self, flags, copy, track)
+
+        # acquire lock here so the subsequent calls to recv for the
+        # message parts after the first don't block
+        with self._eventlet_recv_lock:
+            return _Socket_recv_multipart(self, flags, copy, track)