Add python-eventlet 0.16.1
[packages/trusty/python-eventlet.git] / eventlet / tests / zmq_test.py
1 from __future__ import with_statement
2
3 from eventlet import event, spawn, sleep, semaphore
4 from nose.tools import *
5 from tests import check_idle_cpu_usage, LimitedTestCase, using_pyevent, skip_unless
6
7 try:
8     from eventlet.green import zmq
9 except ImportError:
10     zmq = {}    # for systems lacking zmq, skips tests instead of barfing
11 else:
12     RECV_ON_CLOSED_SOCKET_ERRNOS = (zmq.ENOTSUP, zmq.ENOTSOCK)
13
14
15 def zmq_supported(_):
16     try:
17         import zmq
18     except ImportError:
19         return False
20     return not using_pyevent(_)
21
22
23 class TestUpstreamDownStream(LimitedTestCase):
24     @skip_unless(zmq_supported)
25     def setUp(self):
26         super(TestUpstreamDownStream, self).setUp()
27         self.context = zmq.Context()
28         self.sockets = []
29
30     @skip_unless(zmq_supported)
31     def tearDown(self):
32         self.clear_up_sockets()
33         super(TestUpstreamDownStream, self).tearDown()
34
35     def create_bound_pair(self, type1, type2, interface='tcp://127.0.0.1'):
36         """Create a bound socket pair using a random port."""
37         s1 = self.context.socket(type1)
38         port = s1.bind_to_random_port(interface)
39         s2 = self.context.socket(type2)
40         s2.connect('%s:%s' % (interface, port))
41         self.sockets.append(s1)
42         self.sockets.append(s2)
43         return s1, s2, port
44
45     def clear_up_sockets(self):
46         for sock in self.sockets:
47             sock.close()
48         self.sockets = None
49         self.context.destroy(0)
50
51     def assertRaisesErrno(self, errnos, func, *args):
52         try:
53             func(*args)
54         except zmq.ZMQError as e:
55             if not hasattr(errnos, '__iter__'):
56                 errnos = (errnos,)
57
58             if e.errno not in errnos:
59                 raise AssertionError(
60                     "wrong error raised, expected one of ['%s'], got '%s'" % (
61                         ", ".join("%s" % zmq.ZMQError(errno) for errno in errnos),
62                         zmq.ZMQError(e.errno)
63                     ),
64                 )
65         else:
66             self.fail("Function did not raise any error")
67
68     @skip_unless(zmq_supported)
69     def test_close_linger(self):
70         """Socket.close() must support linger argument.
71
72         https://github.com/eventlet/eventlet/issues/9
73         """
74         sock1, sock2, _ = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
75         sock1.close(1)
76         sock2.close(linger=0)
77
78     @skip_unless(zmq_supported)
79     def test_recv_spawned_before_send_is_non_blocking(self):
80         req, rep, port = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
81 #       req.connect(ipc)
82 #       rep.bind(ipc)
83         sleep()
84         msg = dict(res=None)
85         done = event.Event()
86
87         def rx():
88             msg['res'] = rep.recv()
89             done.send('done')
90
91         spawn(rx)
92         req.send(b'test')
93         done.wait()
94         self.assertEqual(msg['res'], b'test')
95
96     @skip_unless(zmq_supported)
97     def test_close_socket_raises_enotsup(self):
98         req, rep, port = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
99
100         rep.close()
101         req.close()
102         self.assertRaisesErrno(RECV_ON_CLOSED_SOCKET_ERRNOS, rep.recv)
103         self.assertRaisesErrno(RECV_ON_CLOSED_SOCKET_ERRNOS, req.send, b'test')
104
105     @skip_unless(zmq_supported)
106     def test_close_xsocket_raises_enotsup(self):
107         req, rep, port = self.create_bound_pair(zmq.XREQ, zmq.XREP)
108
109         rep.close()
110         req.close()
111         self.assertRaisesErrno(RECV_ON_CLOSED_SOCKET_ERRNOS, rep.recv)
112         self.assertRaisesErrno(RECV_ON_CLOSED_SOCKET_ERRNOS, req.send, b'test')
113
114     @skip_unless(zmq_supported)
115     def test_send_1k_req_rep(self):
116         req, rep, port = self.create_bound_pair(zmq.REQ, zmq.REP)
117         sleep()
118         done = event.Event()
119
120         def tx():
121             tx_i = 0
122             req.send(str(tx_i).encode())
123             while req.recv() != b'done':
124                 tx_i += 1
125                 req.send(str(tx_i).encode())
126             done.send(0)
127
128         def rx():
129             while True:
130                 rx_i = rep.recv()
131                 if rx_i == b"1000":
132                     rep.send(b'done')
133                     break
134                 rep.send(b'i')
135         spawn(tx)
136         spawn(rx)
137         final_i = done.wait()
138         self.assertEqual(final_i, 0)
139
140     @skip_unless(zmq_supported)
141     def test_send_1k_push_pull(self):
142         down, up, port = self.create_bound_pair(zmq.PUSH, zmq.PULL)
143         sleep()
144
145         done = event.Event()
146
147         def tx():
148             tx_i = 0
149             while tx_i <= 1000:
150                 tx_i += 1
151                 down.send(str(tx_i).encode())
152
153         def rx():
154             while True:
155                 rx_i = up.recv()
156                 if rx_i == b"1000":
157                     done.send(0)
158                     break
159         spawn(tx)
160         spawn(rx)
161         final_i = done.wait()
162         self.assertEqual(final_i, 0)
163
164     @skip_unless(zmq_supported)
165     def test_send_1k_pub_sub(self):
166         pub, sub_all, port = self.create_bound_pair(zmq.PUB, zmq.SUB)
167         sub1 = self.context.socket(zmq.SUB)
168         sub2 = self.context.socket(zmq.SUB)
169         self.sockets.extend([sub1, sub2])
170         addr = 'tcp://127.0.0.1:%s' % port
171         sub1.connect(addr)
172         sub2.connect(addr)
173         sub_all.setsockopt(zmq.SUBSCRIBE, b'')
174         sub1.setsockopt(zmq.SUBSCRIBE, b'sub1')
175         sub2.setsockopt(zmq.SUBSCRIBE, b'sub2')
176
177         sub_all_done = event.Event()
178         sub1_done = event.Event()
179         sub2_done = event.Event()
180
181         sleep(0.2)
182
183         def rx(sock, done_evt, msg_count=10000):
184             count = 0
185             while count < msg_count:
186                 msg = sock.recv()
187                 sleep()
188                 if b'LAST' in msg:
189                     break
190                 count += 1
191
192             done_evt.send(count)
193
194         def tx(sock):
195             for i in range(1, 1001):
196                 msg = ("sub%s %s" % ([2, 1][i % 2], i)).encode()
197                 sock.send(msg)
198                 sleep()
199             sock.send(b'sub1 LAST')
200             sock.send(b'sub2 LAST')
201
202         spawn(rx, sub_all, sub_all_done)
203         spawn(rx, sub1, sub1_done)
204         spawn(rx, sub2, sub2_done)
205         spawn(tx, pub)
206         sub1_count = sub1_done.wait()
207         sub2_count = sub2_done.wait()
208         sub_all_count = sub_all_done.wait()
209         self.assertEqual(sub1_count, 500)
210         self.assertEqual(sub2_count, 500)
211         self.assertEqual(sub_all_count, 1000)
212
213     @skip_unless(zmq_supported)
214     def test_change_subscription(self):
215         pub, sub, port = self.create_bound_pair(zmq.PUB, zmq.SUB)
216         sub.setsockopt(zmq.SUBSCRIBE, b'test')
217
218         sleep(0.2)
219         sub_done = event.Event()
220
221         def rx(sock, done_evt):
222             count = 0
223             sub = b'test'
224             while True:
225                 msg = sock.recv()
226                 sleep()
227                 if b'DONE' in msg:
228                     break
229                 if b'LAST' in msg and sub == b'test':
230                     sock.setsockopt(zmq.UNSUBSCRIBE, b'test')
231                     sock.setsockopt(zmq.SUBSCRIBE, b'done')
232                     sub = b'done'
233                 count += 1
234             done_evt.send(count)
235
236         def tx(sock):
237             for i in range(1, 101):
238                 msg = ("test %s" % i).encode()
239                 if i != 50:
240                     sock.send(msg)
241                 else:
242                     sock.send(b'test LAST')
243                 sleep()
244             sock.send(b'done DONE')
245
246         spawn(rx, sub, sub_done)
247         spawn(tx, pub)
248
249         rx_count = sub_done.wait()
250         self.assertEqual(rx_count, 50)
251
252     @skip_unless(zmq_supported)
253     def test_recv_multipart_bug68(self):
254         req, rep, port = self.create_bound_pair(zmq.REQ, zmq.REP)
255         msg = [b'']
256         req.send_multipart(msg)
257         recieved_msg = rep.recv_multipart()
258         self.assertEqual(recieved_msg, msg)
259
260         # Send a message back the other way
261         msg2 = [b""]
262         rep.send_multipart(msg2, copy=False)
263         # When receiving a copy it's a zmq.core.message.Message you get back
264         recieved_msg = req.recv_multipart(copy=False)
265         # So it needs to be converted to a string
266         # I'm calling str(m) consciously here; Message has a .data attribute
267         # but it's private __str__ appears to be the way to go
268         self.assertEqual([m.bytes for m in recieved_msg], msg2)
269
270     @skip_unless(zmq_supported)
271     def test_recv_noblock_bug76(self):
272         req, rep, port = self.create_bound_pair(zmq.REQ, zmq.REP)
273         self.assertRaisesErrno(zmq.EAGAIN, rep.recv, zmq.NOBLOCK)
274         self.assertRaisesErrno(zmq.EAGAIN, rep.recv, zmq.NOBLOCK, True)
275
276     @skip_unless(zmq_supported)
277     def test_send_during_recv(self):
278         sender, receiver, port = self.create_bound_pair(zmq.XREQ, zmq.XREQ)
279         sleep()
280
281         num_recvs = 30
282         done_evts = [event.Event() for _ in range(num_recvs)]
283
284         def slow_rx(done, msg):
285             self.assertEqual(sender.recv(), msg)
286             done.send(0)
287
288         def tx():
289             tx_i = 0
290             while tx_i <= 1000:
291                 sender.send(str(tx_i).encode())
292                 tx_i += 1
293
294         def rx():
295             while True:
296                 rx_i = receiver.recv()
297                 if rx_i == b"1000":
298                     for i in range(num_recvs):
299                         receiver.send(('done%d' % i).encode())
300                     sleep()
301                     return
302
303         for i in range(num_recvs):
304             spawn(slow_rx, done_evts[i], ("done%d" % i).encode())
305
306         spawn(tx)
307         spawn(rx)
308         for evt in done_evts:
309             self.assertEqual(evt.wait(), 0)
310
311     @skip_unless(zmq_supported)
312     def test_send_during_recv_multipart(self):
313         sender, receiver, port = self.create_bound_pair(zmq.XREQ, zmq.XREQ)
314         sleep()
315
316         num_recvs = 30
317         done_evts = [event.Event() for _ in range(num_recvs)]
318
319         def slow_rx(done, msg):
320             self.assertEqual(sender.recv_multipart(), msg)
321             done.send(0)
322
323         def tx():
324             tx_i = 0
325             while tx_i <= 1000:
326                 sender.send_multipart([str(tx_i).encode(), b'1', b'2', b'3'])
327                 tx_i += 1
328
329         def rx():
330             while True:
331                 rx_i = receiver.recv_multipart()
332                 if rx_i == [b"1000", b'1', b'2', b'3']:
333                     for i in range(num_recvs):
334                         receiver.send_multipart([
335                             ('done%d' % i).encode(), b'a', b'b', b'c'])
336                     sleep()
337                     return
338
339         for i in range(num_recvs):
340             spawn(slow_rx, done_evts[i], [
341                 ("done%d" % i).encode(), b'a', b'b', b'c'])
342
343         spawn(tx)
344         spawn(rx)
345         for i in range(num_recvs):
346             final_i = done_evts[i].wait()
347             self.assertEqual(final_i, 0)
348
349     # Need someway to ensure a thread is blocked on send... This isn't working
350     @skip_unless(zmq_supported)
351     def test_recv_during_send(self):
352         sender, receiver, port = self.create_bound_pair(zmq.XREQ, zmq.XREQ)
353         sleep()
354
355         done = event.Event()
356
357         try:
358             SNDHWM = zmq.SNDHWM
359         except AttributeError:
360             # ZeroMQ <3.0
361             SNDHWM = zmq.HWM
362
363         sender.setsockopt(SNDHWM, 10)
364         sender.setsockopt(zmq.SNDBUF, 10)
365
366         receiver.setsockopt(zmq.RCVBUF, 10)
367
368         def tx():
369             tx_i = 0
370             while tx_i <= 1000:
371                 sender.send(str(tx_i).encode())
372                 tx_i += 1
373             done.send(0)
374
375         spawn(tx)
376         final_i = done.wait()
377         self.assertEqual(final_i, 0)
378
379     @skip_unless(zmq_supported)
380     def test_close_during_recv(self):
381         sender, receiver, port = self.create_bound_pair(zmq.XREQ, zmq.XREQ)
382         sleep()
383         done1 = event.Event()
384         done2 = event.Event()
385
386         def rx(e):
387             self.assertRaisesErrno(RECV_ON_CLOSED_SOCKET_ERRNOS, receiver.recv)
388             e.send()
389
390         spawn(rx, done1)
391         spawn(rx, done2)
392
393         sleep()
394         receiver.close()
395
396         done1.wait()
397         done2.wait()
398
399     @skip_unless(zmq_supported)
400     def test_getsockopt_events(self):
401         sock1, sock2, _port = self.create_bound_pair(zmq.DEALER, zmq.DEALER)
402         sleep()
403         poll_out = zmq.Poller()
404         poll_out.register(sock1, zmq.POLLOUT)
405         sock_map = poll_out.poll(100)
406         self.assertEqual(len(sock_map), 1)
407         events = sock1.getsockopt(zmq.EVENTS)
408         self.assertEqual(events & zmq.POLLOUT, zmq.POLLOUT)
409         sock1.send(b'')
410
411         poll_in = zmq.Poller()
412         poll_in.register(sock2, zmq.POLLIN)
413         sock_map = poll_in.poll(100)
414         self.assertEqual(len(sock_map), 1)
415         events = sock2.getsockopt(zmq.EVENTS)
416         self.assertEqual(events & zmq.POLLIN, zmq.POLLIN)
417
418     @skip_unless(zmq_supported)
419     def test_cpu_usage_after_bind(self):
420         """zmq eats CPU after PUB socket .bind()
421
422         https://bitbucket.org/eventlet/eventlet/issue/128
423
424         According to the ZeroMQ documentation, the socket file descriptor
425         can be readable without any pending messages. So we need to ensure
426         that Eventlet wraps around ZeroMQ sockets do not create busy loops.
427
428         A naive way to test it is to measure resource usage. This will require
429         some tuning to set appropriate acceptable limits.
430         """
431         sock = self.context.socket(zmq.PUB)
432         self.sockets.append(sock)
433         sock.bind_to_random_port("tcp://127.0.0.1")
434         sleep()
435         check_idle_cpu_usage(0.2, 0.1)
436
437     @skip_unless(zmq_supported)
438     def test_cpu_usage_after_pub_send_or_dealer_recv(self):
439         """zmq eats CPU after PUB send or DEALER recv.
440
441         Same https://bitbucket.org/eventlet/eventlet/issue/128
442         """
443         pub, sub, _port = self.create_bound_pair(zmq.PUB, zmq.SUB)
444         sub.setsockopt(zmq.SUBSCRIBE, b"")
445         sleep()
446         pub.send(b'test_send')
447         check_idle_cpu_usage(0.2, 0.1)
448
449         sender, receiver, _port = self.create_bound_pair(zmq.DEALER, zmq.DEALER)
450         sleep()
451         sender.send(b'test_recv')
452         msg = receiver.recv()
453         self.assertEqual(msg, b'test_recv')
454         check_idle_cpu_usage(0.2, 0.1)
455
456
457 class TestQueueLock(LimitedTestCase):
458     @skip_unless(zmq_supported)
459     def test_queue_lock_order(self):
460         q = zmq._QueueLock()
461         s = semaphore.Semaphore(0)
462         results = []
463
464         def lock(x):
465             with q:
466                 results.append(x)
467             s.release()
468
469         q.acquire()
470
471         spawn(lock, 1)
472         sleep()
473         spawn(lock, 2)
474         sleep()
475         spawn(lock, 3)
476         sleep()
477
478         self.assertEqual(results, [])
479         q.release()
480         s.acquire()
481         s.acquire()
482         s.acquire()
483         self.assertEqual(results, [1, 2, 3])
484
485     @skip_unless(zmq_supported)
486     def test_count(self):
487         q = zmq._QueueLock()
488         self.assertFalse(q)
489         q.acquire()
490         self.assertTrue(q)
491         q.release()
492         self.assertFalse(q)
493
494         with q:
495             self.assertTrue(q)
496         self.assertFalse(q)
497
498     @skip_unless(zmq_supported)
499     def test_errors(self):
500         q = zmq._QueueLock()
501
502         self.assertRaises(zmq.LockReleaseError, q.release)
503
504         q.acquire()
505         q.release()
506
507         self.assertRaises(zmq.LockReleaseError, q.release)
508
509     @skip_unless(zmq_supported)
510     def test_nested_acquire(self):
511         q = zmq._QueueLock()
512         self.assertFalse(q)
513         q.acquire()
514         q.acquire()
515
516         s = semaphore.Semaphore(0)
517         results = []
518
519         def lock(x):
520             with q:
521                 results.append(x)
522             s.release()
523
524         spawn(lock, 1)
525         sleep()
526         self.assertEqual(results, [])
527         q.release()
528         sleep()
529         self.assertEqual(results, [])
530         self.assertTrue(q)
531         q.release()
532
533         s.acquire()
534         self.assertEqual(results, [1])
535
536
537 class TestBlockedThread(LimitedTestCase):
538     @skip_unless(zmq_supported)
539     def test_block(self):
540         e = zmq._BlockedThread()
541         done = event.Event()
542         self.assertFalse(e)
543
544         def block():
545             e.block()
546             done.send(1)
547
548         spawn(block)
549         sleep()
550
551         self.assertFalse(done.has_result())
552         e.wake()
553         done.wait()