Add python-eventlet 0.16.1
[packages/trusty/python-eventlet.git] / eventlet / eventlet / green / zmq.py
1 # -*- coding: utf-8 -*-
2 """The :mod:`zmq` module wraps the :class:`Socket` and :class:`Context`
3 found in :mod:`pyzmq <zmq>` to be non blocking
4 """
5
6 from __future__ import with_statement
7
8 __zmq__ = __import__('zmq')
9 from eventlet import hubs
10 from eventlet.patcher import slurp_properties
11 from eventlet.support import greenlets as greenlet
12
13 __patched__ = ['Context', 'Socket']
14 slurp_properties(__zmq__, globals(), ignore=__patched__)
15
16 from collections import deque
17
18 try:
19     # alias XREQ/XREP to DEALER/ROUTER if available
20     if not hasattr(__zmq__, 'XREQ'):
21         XREQ = DEALER
22     if not hasattr(__zmq__, 'XREP'):
23         XREP = ROUTER
24 except NameError:
25     pass
26
27
28 class LockReleaseError(Exception):
29     pass
30
31
32 class _QueueLock(object):
33     """A Lock that can be acquired by at most one thread. Any other
34     thread calling acquire will be blocked in a queue. When release
35     is called, the threads are awoken in the order they blocked,
36     one at a time. This lock can be required recursively by the same
37     thread."""
38
39     def __init__(self):
40         self._waiters = deque()
41         self._count = 0
42         self._holder = None
43         self._hub = hubs.get_hub()
44
45     def __nonzero__(self):
46         return bool(self._count)
47
48     __bool__ = __nonzero__
49
50     def __enter__(self):
51         self.acquire()
52
53     def __exit__(self, type, value, traceback):
54         self.release()
55
56     def acquire(self):
57         current = greenlet.getcurrent()
58         if (self._waiters or self._count > 0) and self._holder is not current:
59             # block until lock is free
60             self._waiters.append(current)
61             self._hub.switch()
62             w = self._waiters.popleft()
63
64             assert w is current, 'Waiting threads woken out of order'
65             assert self._count == 0, 'After waking a thread, the lock must be unacquired'
66
67         self._holder = current
68         self._count += 1
69
70     def release(self):
71         if self._count <= 0:
72             raise LockReleaseError("Cannot release unacquired lock")
73
74         self._count -= 1
75         if self._count == 0:
76             self._holder = None
77             if self._waiters:
78                 # wake next
79                 self._hub.schedule_call_global(0, self._waiters[0].switch)
80
81
82 class _BlockedThread(object):
83     """Is either empty, or represents a single blocked thread that
84     blocked itself by calling the block() method. The thread can be
85     awoken by calling wake(). Wake() can be called multiple times and
86     all but the first call will have no effect."""
87
88     def __init__(self):
89         self._blocked_thread = None
90         self._wakeupper = None
91         self._hub = hubs.get_hub()
92
93     def __nonzero__(self):
94         return self._blocked_thread is not None
95
96     __bool__ = __nonzero__
97
98     def block(self):
99         if self._blocked_thread is not None:
100             raise Exception("Cannot block more than one thread on one BlockedThread")
101         self._blocked_thread = greenlet.getcurrent()
102
103         try:
104             self._hub.switch()
105         finally:
106             self._blocked_thread = None
107             # cleanup the wakeup task
108             if self._wakeupper is not None:
109                 # Important to cancel the wakeup task so it doesn't
110                 # spuriously wake this greenthread later on.
111                 self._wakeupper.cancel()
112                 self._wakeupper = None
113
114     def wake(self):
115         """Schedules the blocked thread to be awoken and return
116         True. If wake has already been called or if there is no
117         blocked thread, then this call has no effect and returns
118         False."""
119         if self._blocked_thread is not None and self._wakeupper is None:
120             self._wakeupper = self._hub.schedule_call_global(0, self._blocked_thread.switch)
121             return True
122         return False
123
124
125 class Context(__zmq__.Context):
126     """Subclass of :class:`zmq.core.context.Context`
127     """
128
129     def socket(self, socket_type):
130         """Overridden method to ensure that the green version of socket is used
131
132         Behaves the same as :meth:`zmq.core.context.Context.socket`, but ensures
133         that a :class:`Socket` with all of its send and recv methods set to be
134         non-blocking is returned
135         """
136         if self.closed:
137             raise ZMQError(ENOTSUP)
138         return Socket(self, socket_type)
139
140
141 def _wraps(source_fn):
142     """A decorator that copies the __name__ and __doc__ from the given
143     function
144     """
145     def wrapper(dest_fn):
146         dest_fn.__name__ = source_fn.__name__
147         dest_fn.__doc__ = source_fn.__doc__
148         return dest_fn
149     return wrapper
150
151 # Implementation notes: Each socket in 0mq contains a pipe that the
152 # background IO threads use to communicate with the socket. These
153 # events are important because they tell the socket when it is able to
154 # send and when it has messages waiting to be received. The read end
155 # of the events pipe is the same FD that getsockopt(zmq.FD) returns.
156 #
157 # Events are read from the socket's event pipe only on the thread that
158 # the 0mq context is associated with, which is the native thread the
159 # greenthreads are running on, and the only operations that cause the
160 # events to be read and processed are send(), recv() and
161 # getsockopt(zmq.EVENTS). This means that after doing any of these
162 # three operations, the ability of the socket to send or receive a
163 # message without blocking may have changed, but after the events are
164 # read the FD is no longer readable so the hub may not signal our
165 # listener.
166 #
167 # If we understand that after calling send() a message might be ready
168 # to be received and that after calling recv() a message might be able
169 # to be sent, what should we do next? There are two approaches:
170 #
171 #  1. Always wake the other thread if there is one waiting. This
172 #  wakeup may be spurious because the socket might not actually be
173 #  ready for a send() or recv().  However, if a thread is in a
174 #  tight-loop successfully calling send() or recv() then the wakeups
175 #  are naturally batched and there's very little cost added to each
176 #  send/recv call.
177 #
178 # or
179 #
180 #  2. Call getsockopt(zmq.EVENTS) and explicitly check if the other
181 #  thread should be woken up. This avoids spurious wake-ups but may
182 #  add overhead because getsockopt will cause all events to be
183 #  processed, whereas send and recv throttle processing
184 #  events. Admittedly, all of the events will need to be processed
185 #  eventually, but it is likely faster to batch the processing.
186 #
187 # Which approach is better? I have no idea.
188 #
189 # TODO:
190 # - Support MessageTrackers and make MessageTracker.wait green
191
192 _Socket = __zmq__.Socket
193 _Socket_recv = _Socket.recv
194 _Socket_send = _Socket.send
195 _Socket_send_multipart = _Socket.send_multipart
196 _Socket_recv_multipart = _Socket.recv_multipart
197 _Socket_getsockopt = _Socket.getsockopt
198
199
200 class Socket(_Socket):
201     """Green version of :class:`zmq.core.socket.Socket
202
203     The following three methods are always overridden:
204         * send
205         * recv
206         * getsockopt
207     To ensure that the ``zmq.NOBLOCK`` flag is set and that sending or recieving
208     is deferred to the hub (using :func:`eventlet.hubs.trampoline`) if a
209     ``zmq.EAGAIN`` (retry) error is raised
210
211     For some socket types, the following methods are also overridden:
212         * send_multipart
213         * recv_multipart
214     """
215
216     def __init__(self, context, socket_type):
217         super(Socket, self).__init__(context, socket_type)
218
219         self.__dict__['_eventlet_send_event'] = _BlockedThread()
220         self.__dict__['_eventlet_recv_event'] = _BlockedThread()
221         self.__dict__['_eventlet_send_lock'] = _QueueLock()
222         self.__dict__['_eventlet_recv_lock'] = _QueueLock()
223
224         def event(fd):
225             # Some events arrived at the zmq socket. This may mean
226             # there's a message that can be read or there's space for
227             # a message to be written.
228             send_wake = self._eventlet_send_event.wake()
229             recv_wake = self._eventlet_recv_event.wake()
230             if not send_wake and not recv_wake:
231                 # if no waiting send or recv thread was woken up, then
232                 # force the zmq socket's events to be processed to
233                 # avoid repeated wakeups
234                 _Socket_getsockopt(self, EVENTS)
235
236         hub = hubs.get_hub()
237         self.__dict__['_eventlet_listener'] = hub.add(hub.READ,
238                                                       self.getsockopt(FD),
239                                                       event,
240                                                       lambda _: None,
241                                                       lambda: None)
242
243     @_wraps(_Socket.close)
244     def close(self, linger=None):
245         super(Socket, self).close(linger)
246         if self._eventlet_listener is not None:
247             hubs.get_hub().remove(self._eventlet_listener)
248             self.__dict__['_eventlet_listener'] = None
249             # wake any blocked threads
250             self._eventlet_send_event.wake()
251             self._eventlet_recv_event.wake()
252
253     @_wraps(_Socket.getsockopt)
254     def getsockopt(self, option):
255         result = _Socket_getsockopt(self, option)
256         if option == EVENTS:
257             # Getting the events causes the zmq socket to process
258             # events which may mean a msg can be sent or received. If
259             # there is a greenthread blocked and waiting for events,
260             # it will miss the edge-triggered read event, so wake it
261             # up.
262             if (result & POLLOUT):
263                 self._eventlet_send_event.wake()
264             if (result & POLLIN):
265                 self._eventlet_recv_event.wake()
266         return result
267
268     @_wraps(_Socket.send)
269     def send(self, msg, flags=0, copy=True, track=False):
270         """A send method that's safe to use when multiple greenthreads
271         are calling send, send_multipart, recv and recv_multipart on
272         the same socket.
273         """
274         if flags & NOBLOCK:
275             result = _Socket_send(self, msg, flags, copy, track)
276             # Instead of calling both wake methods, could call
277             # self.getsockopt(EVENTS) which would trigger wakeups if
278             # needed.
279             self._eventlet_send_event.wake()
280             self._eventlet_recv_event.wake()
281             return result
282
283         # TODO: pyzmq will copy the message buffer and create Message
284         # objects under some circumstances. We could do that work here
285         # once to avoid doing it every time the send is retried.
286         flags |= NOBLOCK
287         with self._eventlet_send_lock:
288             while True:
289                 try:
290                     return _Socket_send(self, msg, flags, copy, track)
291                 except ZMQError as e:
292                     if e.errno == EAGAIN:
293                         self._eventlet_send_event.block()
294                     else:
295                         raise
296                 finally:
297                     # The call to send processes 0mq events and may
298                     # make the socket ready to recv. Wake the next
299                     # receiver. (Could check EVENTS for POLLIN here)
300                     self._eventlet_recv_event.wake()
301
302     @_wraps(_Socket.send_multipart)
303     def send_multipart(self, msg_parts, flags=0, copy=True, track=False):
304         """A send_multipart method that's safe to use when multiple
305         greenthreads are calling send, send_multipart, recv and
306         recv_multipart on the same socket.
307         """
308         if flags & NOBLOCK:
309             return _Socket_send_multipart(self, msg_parts, flags, copy, track)
310
311         # acquire lock here so the subsequent calls to send for the
312         # message parts after the first don't block
313         with self._eventlet_send_lock:
314             return _Socket_send_multipart(self, msg_parts, flags, copy, track)
315
316     @_wraps(_Socket.recv)
317     def recv(self, flags=0, copy=True, track=False):
318         """A recv method that's safe to use when multiple greenthreads
319         are calling send, send_multipart, recv and recv_multipart on
320         the same socket.
321         """
322         if flags & NOBLOCK:
323             msg = _Socket_recv(self, flags, copy, track)
324             # Instead of calling both wake methods, could call
325             # self.getsockopt(EVENTS) which would trigger wakeups if
326             # needed.
327             self._eventlet_send_event.wake()
328             self._eventlet_recv_event.wake()
329             return msg
330
331         flags |= NOBLOCK
332         with self._eventlet_recv_lock:
333             while True:
334                 try:
335                     return _Socket_recv(self, flags, copy, track)
336                 except ZMQError as e:
337                     if e.errno == EAGAIN:
338                         self._eventlet_recv_event.block()
339                     else:
340                         raise
341                 finally:
342                     # The call to recv processes 0mq events and may
343                     # make the socket ready to send. Wake the next
344                     # receiver. (Could check EVENTS for POLLOUT here)
345                     self._eventlet_send_event.wake()
346
347     @_wraps(_Socket.recv_multipart)
348     def recv_multipart(self, flags=0, copy=True, track=False):
349         """A recv_multipart method that's safe to use when multiple
350         greenthreads are calling send, send_multipart, recv and
351         recv_multipart on the same socket.
352         """
353         if flags & NOBLOCK:
354             return _Socket_recv_multipart(self, flags, copy, track)
355
356         # acquire lock here so the subsequent calls to recv for the
357         # message parts after the first don't block
358         with self._eventlet_recv_lock:
359             return _Socket_recv_multipart(self, flags, copy, track)