1 # -*- coding: utf-8 -*-
2 """The :mod:`zmq` module wraps the :class:`Socket` and :class:`Context`
3 found in :mod:`pyzmq <zmq>` to be non blocking
6 from __future__ import with_statement
8 __zmq__ = __import__('zmq')
9 from eventlet import hubs
10 from eventlet.patcher import slurp_properties
11 from eventlet.support import greenlets as greenlet
13 __patched__ = ['Context', 'Socket']
14 slurp_properties(__zmq__, globals(), ignore=__patched__)
16 from collections import deque
19 # alias XREQ/XREP to DEALER/ROUTER if available
20 if not hasattr(__zmq__, 'XREQ'):
22 if not hasattr(__zmq__, 'XREP'):
28 class LockReleaseError(Exception):
32 class _QueueLock(object):
33 """A Lock that can be acquired by at most one thread. Any other
34 thread calling acquire will be blocked in a queue. When release
35 is called, the threads are awoken in the order they blocked,
36 one at a time. This lock can be required recursively by the same
40 self._waiters = deque()
43 self._hub = hubs.get_hub()
45 def __nonzero__(self):
46 return bool(self._count)
48 __bool__ = __nonzero__
53 def __exit__(self, type, value, traceback):
57 current = greenlet.getcurrent()
58 if (self._waiters or self._count > 0) and self._holder is not current:
59 # block until lock is free
60 self._waiters.append(current)
62 w = self._waiters.popleft()
64 assert w is current, 'Waiting threads woken out of order'
65 assert self._count == 0, 'After waking a thread, the lock must be unacquired'
67 self._holder = current
72 raise LockReleaseError("Cannot release unacquired lock")
79 self._hub.schedule_call_global(0, self._waiters[0].switch)
82 class _BlockedThread(object):
83 """Is either empty, or represents a single blocked thread that
84 blocked itself by calling the block() method. The thread can be
85 awoken by calling wake(). Wake() can be called multiple times and
86 all but the first call will have no effect."""
89 self._blocked_thread = None
90 self._wakeupper = None
91 self._hub = hubs.get_hub()
93 def __nonzero__(self):
94 return self._blocked_thread is not None
96 __bool__ = __nonzero__
99 if self._blocked_thread is not None:
100 raise Exception("Cannot block more than one thread on one BlockedThread")
101 self._blocked_thread = greenlet.getcurrent()
106 self._blocked_thread = None
107 # cleanup the wakeup task
108 if self._wakeupper is not None:
109 # Important to cancel the wakeup task so it doesn't
110 # spuriously wake this greenthread later on.
111 self._wakeupper.cancel()
112 self._wakeupper = None
115 """Schedules the blocked thread to be awoken and return
116 True. If wake has already been called or if there is no
117 blocked thread, then this call has no effect and returns
119 if self._blocked_thread is not None and self._wakeupper is None:
120 self._wakeupper = self._hub.schedule_call_global(0, self._blocked_thread.switch)
125 class Context(__zmq__.Context):
126 """Subclass of :class:`zmq.core.context.Context`
129 def socket(self, socket_type):
130 """Overridden method to ensure that the green version of socket is used
132 Behaves the same as :meth:`zmq.core.context.Context.socket`, but ensures
133 that a :class:`Socket` with all of its send and recv methods set to be
134 non-blocking is returned
137 raise ZMQError(ENOTSUP)
138 return Socket(self, socket_type)
141 def _wraps(source_fn):
142 """A decorator that copies the __name__ and __doc__ from the given
145 def wrapper(dest_fn):
146 dest_fn.__name__ = source_fn.__name__
147 dest_fn.__doc__ = source_fn.__doc__
151 # Implementation notes: Each socket in 0mq contains a pipe that the
152 # background IO threads use to communicate with the socket. These
153 # events are important because they tell the socket when it is able to
154 # send and when it has messages waiting to be received. The read end
155 # of the events pipe is the same FD that getsockopt(zmq.FD) returns.
157 # Events are read from the socket's event pipe only on the thread that
158 # the 0mq context is associated with, which is the native thread the
159 # greenthreads are running on, and the only operations that cause the
160 # events to be read and processed are send(), recv() and
161 # getsockopt(zmq.EVENTS). This means that after doing any of these
162 # three operations, the ability of the socket to send or receive a
163 # message without blocking may have changed, but after the events are
164 # read the FD is no longer readable so the hub may not signal our
167 # If we understand that after calling send() a message might be ready
168 # to be received and that after calling recv() a message might be able
169 # to be sent, what should we do next? There are two approaches:
171 # 1. Always wake the other thread if there is one waiting. This
172 # wakeup may be spurious because the socket might not actually be
173 # ready for a send() or recv(). However, if a thread is in a
174 # tight-loop successfully calling send() or recv() then the wakeups
175 # are naturally batched and there's very little cost added to each
180 # 2. Call getsockopt(zmq.EVENTS) and explicitly check if the other
181 # thread should be woken up. This avoids spurious wake-ups but may
182 # add overhead because getsockopt will cause all events to be
183 # processed, whereas send and recv throttle processing
184 # events. Admittedly, all of the events will need to be processed
185 # eventually, but it is likely faster to batch the processing.
187 # Which approach is better? I have no idea.
190 # - Support MessageTrackers and make MessageTracker.wait green
192 _Socket = __zmq__.Socket
193 _Socket_recv = _Socket.recv
194 _Socket_send = _Socket.send
195 _Socket_send_multipart = _Socket.send_multipart
196 _Socket_recv_multipart = _Socket.recv_multipart
197 _Socket_getsockopt = _Socket.getsockopt
200 class Socket(_Socket):
201 """Green version of :class:`zmq.core.socket.Socket
203 The following three methods are always overridden:
207 To ensure that the ``zmq.NOBLOCK`` flag is set and that sending or receiving
208 is deferred to the hub (using :func:`eventlet.hubs.trampoline`) if a
209 ``zmq.EAGAIN`` (retry) error is raised
211 For some socket types, the following methods are also overridden:
216 def __init__(self, context, socket_type):
217 super(Socket, self).__init__(context, socket_type)
219 self.__dict__['_eventlet_send_event'] = _BlockedThread()
220 self.__dict__['_eventlet_recv_event'] = _BlockedThread()
221 self.__dict__['_eventlet_send_lock'] = _QueueLock()
222 self.__dict__['_eventlet_recv_lock'] = _QueueLock()
225 # Some events arrived at the zmq socket. This may mean
226 # there's a message that can be read or there's space for
227 # a message to be written.
228 send_wake = self._eventlet_send_event.wake()
229 recv_wake = self._eventlet_recv_event.wake()
230 if not send_wake and not recv_wake:
231 # if no waiting send or recv thread was woken up, then
232 # force the zmq socket's events to be processed to
233 # avoid repeated wakeups
234 _Socket_getsockopt(self, EVENTS)
237 self.__dict__['_eventlet_listener'] = hub.add(hub.READ,
243 @_wraps(_Socket.close)
244 def close(self, linger=None):
245 super(Socket, self).close(linger)
246 if self._eventlet_listener is not None:
247 hubs.get_hub().remove(self._eventlet_listener)
248 self.__dict__['_eventlet_listener'] = None
249 # wake any blocked threads
250 self._eventlet_send_event.wake()
251 self._eventlet_recv_event.wake()
253 @_wraps(_Socket.getsockopt)
254 def getsockopt(self, option):
255 result = _Socket_getsockopt(self, option)
257 # Getting the events causes the zmq socket to process
258 # events which may mean a msg can be sent or received. If
259 # there is a greenthread blocked and waiting for events,
260 # it will miss the edge-triggered read event, so wake it
262 if (result & POLLOUT):
263 self._eventlet_send_event.wake()
264 if (result & POLLIN):
265 self._eventlet_recv_event.wake()
268 @_wraps(_Socket.send)
269 def send(self, msg, flags=0, copy=True, track=False):
270 """A send method that's safe to use when multiple greenthreads
271 are calling send, send_multipart, recv and recv_multipart on
275 result = _Socket_send(self, msg, flags, copy, track)
276 # Instead of calling both wake methods, could call
277 # self.getsockopt(EVENTS) which would trigger wakeups if
279 self._eventlet_send_event.wake()
280 self._eventlet_recv_event.wake()
283 # TODO: pyzmq will copy the message buffer and create Message
284 # objects under some circumstances. We could do that work here
285 # once to avoid doing it every time the send is retried.
287 with self._eventlet_send_lock:
290 return _Socket_send(self, msg, flags, copy, track)
291 except ZMQError as e:
292 if e.errno == EAGAIN:
293 self._eventlet_send_event.block()
297 # The call to send processes 0mq events and may
298 # make the socket ready to recv. Wake the next
299 # receiver. (Could check EVENTS for POLLIN here)
300 self._eventlet_recv_event.wake()
302 @_wraps(_Socket.send_multipart)
303 def send_multipart(self, msg_parts, flags=0, copy=True, track=False):
304 """A send_multipart method that's safe to use when multiple
305 greenthreads are calling send, send_multipart, recv and
306 recv_multipart on the same socket.
309 return _Socket_send_multipart(self, msg_parts, flags, copy, track)
311 # acquire lock here so the subsequent calls to send for the
312 # message parts after the first don't block
313 with self._eventlet_send_lock:
314 return _Socket_send_multipart(self, msg_parts, flags, copy, track)
316 @_wraps(_Socket.recv)
317 def recv(self, flags=0, copy=True, track=False):
318 """A recv method that's safe to use when multiple greenthreads
319 are calling send, send_multipart, recv and recv_multipart on
323 msg = _Socket_recv(self, flags, copy, track)
324 # Instead of calling both wake methods, could call
325 # self.getsockopt(EVENTS) which would trigger wakeups if
327 self._eventlet_send_event.wake()
328 self._eventlet_recv_event.wake()
332 with self._eventlet_recv_lock:
335 return _Socket_recv(self, flags, copy, track)
336 except ZMQError as e:
337 if e.errno == EAGAIN:
338 self._eventlet_recv_event.block()
342 # The call to recv processes 0mq events and may
343 # make the socket ready to send. Wake the next
344 # receiver. (Could check EVENTS for POLLOUT here)
345 self._eventlet_send_event.wake()
347 @_wraps(_Socket.recv_multipart)
348 def recv_multipart(self, flags=0, copy=True, track=False):
349 """A recv_multipart method that's safe to use when multiple
350 greenthreads are calling send, send_multipart, recv and
351 recv_multipart on the same socket.
354 return _Socket_recv_multipart(self, flags, copy, track)
356 # acquire lock here so the subsequent calls to recv for the
357 # message parts after the first don't block
358 with self._eventlet_recv_lock:
359 return _Socket_recv_multipart(self, flags, copy, track)