Adjust the package revision; no actual code changes
[packages/trusty/python-eventlet.git] / eventlet / eventlet / green / zmq.py
1 """The :mod:`zmq` module wraps the :class:`Socket` and :class:`Context` found in :mod:`pyzmq <zmq>` to be non blocking
2 """
3
4 from __future__ import with_statement
5
6 __zmq__ = __import__('zmq')
7 from eventlet import hubs
8 from eventlet.patcher import slurp_properties
9 from eventlet.support import greenlets as greenlet
10
11 __patched__ = ['Context', 'Socket']
12 slurp_properties(__zmq__, globals(), ignore=__patched__)
13
14 from collections import deque
15
16 try:
17     # alias XREQ/XREP to DEALER/ROUTER if available
18     if not hasattr(__zmq__, 'XREQ'):
19         XREQ = DEALER
20     if not hasattr(__zmq__, 'XREP'):
21         XREP = ROUTER
22 except NameError:
23     pass
24
25
26 class LockReleaseError(Exception):
27     pass
28
29
30 class _QueueLock(object):
31     """A Lock that can be acquired by at most one thread. Any other
32     thread calling acquire will be blocked in a queue. When release
33     is called, the threads are awoken in the order they blocked,
34     one at a time. This lock can be required recursively by the same
35     thread."""
36
37     def __init__(self):
38         self._waiters = deque()
39         self._count = 0
40         self._holder = None
41         self._hub = hubs.get_hub()
42
43     def __nonzero__(self):
44         return bool(self._count)
45
46     __bool__ = __nonzero__
47
48     def __enter__(self):
49         self.acquire()
50
51     def __exit__(self, type, value, traceback):
52         self.release()
53
54     def acquire(self):
55         current = greenlet.getcurrent()
56         if (self._waiters or self._count > 0) and self._holder is not current:
57             # block until lock is free
58             self._waiters.append(current)
59             self._hub.switch()
60             w = self._waiters.popleft()
61
62             assert w is current, 'Waiting threads woken out of order'
63             assert self._count == 0, 'After waking a thread, the lock must be unacquired'
64
65         self._holder = current
66         self._count += 1
67
68     def release(self):
69         if self._count <= 0:
70             raise LockReleaseError("Cannot release unacquired lock")
71
72         self._count -= 1
73         if self._count == 0:
74             self._holder = None
75             if self._waiters:
76                 # wake next
77                 self._hub.schedule_call_global(0, self._waiters[0].switch)
78
79
80 class _BlockedThread(object):
81     """Is either empty, or represents a single blocked thread that
82     blocked itself by calling the block() method. The thread can be
83     awoken by calling wake(). Wake() can be called multiple times and
84     all but the first call will have no effect."""
85
86     def __init__(self):
87         self._blocked_thread = None
88         self._wakeupper = None
89         self._hub = hubs.get_hub()
90
91     def __nonzero__(self):
92         return self._blocked_thread is not None
93
94     __bool__ = __nonzero__
95
96     def block(self):
97         if self._blocked_thread is not None:
98             raise Exception("Cannot block more than one thread on one BlockedThread")
99         self._blocked_thread = greenlet.getcurrent()
100
101         try:
102             self._hub.switch()
103         finally:
104             self._blocked_thread = None
105             # cleanup the wakeup task
106             if self._wakeupper is not None:
107                 # Important to cancel the wakeup task so it doesn't
108                 # spuriously wake this greenthread later on.
109                 self._wakeupper.cancel()
110                 self._wakeupper = None
111
112     def wake(self):
113         """Schedules the blocked thread to be awoken and return
114         True. If wake has already been called or if there is no
115         blocked thread, then this call has no effect and returns
116         False."""
117         if self._blocked_thread is not None and self._wakeupper is None:
118             self._wakeupper = self._hub.schedule_call_global(0, self._blocked_thread.switch)
119             return True
120         return False
121
122
123 class Context(__zmq__.Context):
124     """Subclass of :class:`zmq.core.context.Context`
125     """
126
127     def socket(self, socket_type):
128         """Overridden method to ensure that the green version of socket is used
129
130         Behaves the same as :meth:`zmq.core.context.Context.socket`, but ensures
131         that a :class:`Socket` with all of its send and recv methods set to be
132         non-blocking is returned
133         """
134         if self.closed:
135             raise ZMQError(ENOTSUP)
136         return Socket(self, socket_type)
137
138
139 def _wraps(source_fn):
140     """A decorator that copies the __name__ and __doc__ from the given
141     function
142     """
143     def wrapper(dest_fn):
144         dest_fn.__name__ = source_fn.__name__
145         dest_fn.__doc__ = source_fn.__doc__
146         return dest_fn
147     return wrapper
148
149 # Implementation notes: Each socket in 0mq contains a pipe that the
150 # background IO threads use to communicate with the socket. These
151 # events are important because they tell the socket when it is able to
152 # send and when it has messages waiting to be received. The read end
153 # of the events pipe is the same FD that getsockopt(zmq.FD) returns.
154 #
155 # Events are read from the socket's event pipe only on the thread that
156 # the 0mq context is associated with, which is the native thread the
157 # greenthreads are running on, and the only operations that cause the
158 # events to be read and processed are send(), recv() and
159 # getsockopt(zmq.EVENTS). This means that after doing any of these
160 # three operations, the ability of the socket to send or receive a
161 # message without blocking may have changed, but after the events are
162 # read the FD is no longer readable so the hub may not signal our
163 # listener.
164 #
165 # If we understand that after calling send() a message might be ready
166 # to be received and that after calling recv() a message might be able
167 # to be sent, what should we do next? There are two approaches:
168 #
169 #  1. Always wake the other thread if there is one waiting. This
170 #  wakeup may be spurious because the socket might not actually be
171 #  ready for a send() or recv().  However, if a thread is in a
172 #  tight-loop successfully calling send() or recv() then the wakeups
173 #  are naturally batched and there's very little cost added to each
174 #  send/recv call.
175 #
176 # or
177 #
178 #  2. Call getsockopt(zmq.EVENTS) and explicitly check if the other
179 #  thread should be woken up. This avoids spurious wake-ups but may
180 #  add overhead because getsockopt will cause all events to be
181 #  processed, whereas send and recv throttle processing
182 #  events. Admittedly, all of the events will need to be processed
183 #  eventually, but it is likely faster to batch the processing.
184 #
185 # Which approach is better? I have no idea.
186 #
187 # TODO:
188 # - Support MessageTrackers and make MessageTracker.wait green
189
190 _Socket = __zmq__.Socket
191 _Socket_recv = _Socket.recv
192 _Socket_send = _Socket.send
193 _Socket_send_multipart = _Socket.send_multipart
194 _Socket_recv_multipart = _Socket.recv_multipart
195 _Socket_getsockopt = _Socket.getsockopt
196
197
198 class Socket(_Socket):
199     """Green version of :class:`zmq.core.socket.Socket
200
201     The following three methods are always overridden:
202         * send
203         * recv
204         * getsockopt
205     To ensure that the ``zmq.NOBLOCK`` flag is set and that sending or recieving
206     is deferred to the hub (using :func:`eventlet.hubs.trampoline`) if a
207     ``zmq.EAGAIN`` (retry) error is raised
208
209     For some socket types, the following methods are also overridden:
210         * send_multipart
211         * recv_multipart
212     """
213
214     def __init__(self, context, socket_type):
215         super(Socket, self).__init__(context, socket_type)
216
217         self.__dict__['_eventlet_send_event'] = _BlockedThread()
218         self.__dict__['_eventlet_recv_event'] = _BlockedThread()
219         self.__dict__['_eventlet_send_lock'] = _QueueLock()
220         self.__dict__['_eventlet_recv_lock'] = _QueueLock()
221
222         def event(fd):
223             # Some events arrived at the zmq socket. This may mean
224             # there's a message that can be read or there's space for
225             # a message to be written.
226             send_wake = self._eventlet_send_event.wake()
227             recv_wake = self._eventlet_recv_event.wake()
228             if not send_wake and not recv_wake:
229                 # if no waiting send or recv thread was woken up, then
230                 # force the zmq socket's events to be processed to
231                 # avoid repeated wakeups
232                 _Socket_getsockopt(self, EVENTS)
233
234         hub = hubs.get_hub()
235         self.__dict__['_eventlet_listener'] = hub.add(hub.READ,
236                                                       self.getsockopt(FD),
237                                                       event,
238                                                       lambda _: None,
239                                                       lambda: None)
240
241     @_wraps(_Socket.close)
242     def close(self, linger=None):
243         super(Socket, self).close(linger)
244         if self._eventlet_listener is not None:
245             hubs.get_hub().remove(self._eventlet_listener)
246             self.__dict__['_eventlet_listener'] = None
247             # wake any blocked threads
248             self._eventlet_send_event.wake()
249             self._eventlet_recv_event.wake()
250
251     @_wraps(_Socket.getsockopt)
252     def getsockopt(self, option):
253         result = _Socket_getsockopt(self, option)
254         if option == EVENTS:
255             # Getting the events causes the zmq socket to process
256             # events which may mean a msg can be sent or received. If
257             # there is a greenthread blocked and waiting for events,
258             # it will miss the edge-triggered read event, so wake it
259             # up.
260             if (result & POLLOUT):
261                 self._eventlet_send_event.wake()
262             if (result & POLLIN):
263                 self._eventlet_recv_event.wake()
264         return result
265
266     @_wraps(_Socket.send)
267     def send(self, msg, flags=0, copy=True, track=False):
268         """A send method that's safe to use when multiple greenthreads
269         are calling send, send_multipart, recv and recv_multipart on
270         the same socket.
271         """
272         if flags & NOBLOCK:
273             result = _Socket_send(self, msg, flags, copy, track)
274             # Instead of calling both wake methods, could call
275             # self.getsockopt(EVENTS) which would trigger wakeups if
276             # needed.
277             self._eventlet_send_event.wake()
278             self._eventlet_recv_event.wake()
279             return result
280
281         # TODO: pyzmq will copy the message buffer and create Message
282         # objects under some circumstances. We could do that work here
283         # once to avoid doing it every time the send is retried.
284         flags |= NOBLOCK
285         with self._eventlet_send_lock:
286             while True:
287                 try:
288                     return _Socket_send(self, msg, flags, copy, track)
289                 except ZMQError as e:
290                     if e.errno == EAGAIN:
291                         self._eventlet_send_event.block()
292                     else:
293                         raise
294                 finally:
295                     # The call to send processes 0mq events and may
296                     # make the socket ready to recv. Wake the next
297                     # receiver. (Could check EVENTS for POLLIN here)
298                     self._eventlet_recv_event.wake()
299
300     @_wraps(_Socket.send_multipart)
301     def send_multipart(self, msg_parts, flags=0, copy=True, track=False):
302         """A send_multipart method that's safe to use when multiple
303         greenthreads are calling send, send_multipart, recv and
304         recv_multipart on the same socket.
305         """
306         if flags & NOBLOCK:
307             return _Socket_send_multipart(self, msg_parts, flags, copy, track)
308
309         # acquire lock here so the subsequent calls to send for the
310         # message parts after the first don't block
311         with self._eventlet_send_lock:
312             return _Socket_send_multipart(self, msg_parts, flags, copy, track)
313
314     @_wraps(_Socket.recv)
315     def recv(self, flags=0, copy=True, track=False):
316         """A recv method that's safe to use when multiple greenthreads
317         are calling send, send_multipart, recv and recv_multipart on
318         the same socket.
319         """
320         if flags & NOBLOCK:
321             msg = _Socket_recv(self, flags, copy, track)
322             # Instead of calling both wake methods, could call
323             # self.getsockopt(EVENTS) which would trigger wakeups if
324             # needed.
325             self._eventlet_send_event.wake()
326             self._eventlet_recv_event.wake()
327             return msg
328
329         flags |= NOBLOCK
330         with self._eventlet_recv_lock:
331             while True:
332                 try:
333                     return _Socket_recv(self, flags, copy, track)
334                 except ZMQError as e:
335                     if e.errno == EAGAIN:
336                         self._eventlet_recv_event.block()
337                     else:
338                         raise
339                 finally:
340                     # The call to recv processes 0mq events and may
341                     # make the socket ready to send. Wake the next
342                     # receiver. (Could check EVENTS for POLLOUT here)
343                     self._eventlet_send_event.wake()
344
345     @_wraps(_Socket.recv_multipart)
346     def recv_multipart(self, flags=0, copy=True, track=False):
347         """A recv_multipart method that's safe to use when multiple
348         greenthreads are calling send, send_multipart, recv and
349         recv_multipart on the same socket.
350         """
351         if flags & NOBLOCK:
352             return _Socket_recv_multipart(self, flags, copy, track)
353
354         # acquire lock here so the subsequent calls to recv for the
355         # message parts after the first don't block
356         with self._eventlet_recv_lock:
357             return _Socket_recv_multipart(self, flags, copy, track)