1 from __future__ import with_statement
3 from eventlet import event, spawn, sleep, semaphore
4 from nose.tools import *
5 from tests import check_idle_cpu_usage, LimitedTestCase, using_pyevent, skip_unless
8 from eventlet.green import zmq
10 zmq = {} # for systems lacking zmq, skips tests instead of barfing
12 RECV_ON_CLOSED_SOCKET_ERRNOS = (zmq.ENOTSUP, zmq.ENOTSOCK)
20 return not using_pyevent(_)
23 class TestUpstreamDownStream(LimitedTestCase):
24 @skip_unless(zmq_supported)
26 super(TestUpstreamDownStream, self).setUp()
27 self.context = zmq.Context()
30 @skip_unless(zmq_supported)
32 self.clear_up_sockets()
33 super(TestUpstreamDownStream, self).tearDown()
35 def create_bound_pair(self, type1, type2, interface='tcp://127.0.0.1'):
36 """Create a bound socket pair using a random port."""
37 s1 = self.context.socket(type1)
38 port = s1.bind_to_random_port(interface)
39 s2 = self.context.socket(type2)
40 s2.connect('%s:%s' % (interface, port))
41 self.sockets.append(s1)
42 self.sockets.append(s2)
45 def clear_up_sockets(self):
46 for sock in self.sockets:
49 self.context.destroy(0)
51 def assertRaisesErrno(self, errnos, func, *args):
54 except zmq.ZMQError as e:
55 if not hasattr(errnos, '__iter__'):
58 if e.errno not in errnos:
60 "wrong error raised, expected one of ['%s'], got '%s'" % (
61 ", ".join("%s" % zmq.ZMQError(errno) for errno in errnos),
66 self.fail("Function did not raise any error")
68 @skip_unless(zmq_supported)
69 def test_close_linger(self):
70 """Socket.close() must support linger argument.
72 https://github.com/eventlet/eventlet/issues/9
74 sock1, sock2, _ = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
78 @skip_unless(zmq_supported)
79 def test_recv_spawned_before_send_is_non_blocking(self):
80 req, rep, port = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
88 msg['res'] = rep.recv()
94 self.assertEqual(msg['res'], b'test')
96 @skip_unless(zmq_supported)
97 def test_close_socket_raises_enotsup(self):
98 req, rep, port = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
102 self.assertRaisesErrno(RECV_ON_CLOSED_SOCKET_ERRNOS, rep.recv)
103 self.assertRaisesErrno(RECV_ON_CLOSED_SOCKET_ERRNOS, req.send, b'test')
105 @skip_unless(zmq_supported)
106 def test_close_xsocket_raises_enotsup(self):
107 req, rep, port = self.create_bound_pair(zmq.XREQ, zmq.XREP)
111 self.assertRaisesErrno(RECV_ON_CLOSED_SOCKET_ERRNOS, rep.recv)
112 self.assertRaisesErrno(RECV_ON_CLOSED_SOCKET_ERRNOS, req.send, b'test')
114 @skip_unless(zmq_supported)
115 def test_send_1k_req_rep(self):
116 req, rep, port = self.create_bound_pair(zmq.REQ, zmq.REP)
122 req.send(str(tx_i).encode())
123 while req.recv() != b'done':
125 req.send(str(tx_i).encode())
137 final_i = done.wait()
138 self.assertEqual(final_i, 0)
140 @skip_unless(zmq_supported)
141 def test_send_1k_push_pull(self):
142 down, up, port = self.create_bound_pair(zmq.PUSH, zmq.PULL)
151 down.send(str(tx_i).encode())
161 final_i = done.wait()
162 self.assertEqual(final_i, 0)
164 @skip_unless(zmq_supported)
165 def test_send_1k_pub_sub(self):
166 pub, sub_all, port = self.create_bound_pair(zmq.PUB, zmq.SUB)
167 sub1 = self.context.socket(zmq.SUB)
168 sub2 = self.context.socket(zmq.SUB)
169 self.sockets.extend([sub1, sub2])
170 addr = 'tcp://127.0.0.1:%s' % port
173 sub_all.setsockopt(zmq.SUBSCRIBE, b'')
174 sub1.setsockopt(zmq.SUBSCRIBE, b'sub1')
175 sub2.setsockopt(zmq.SUBSCRIBE, b'sub2')
177 sub_all_done = event.Event()
178 sub1_done = event.Event()
179 sub2_done = event.Event()
183 def rx(sock, done_evt, msg_count=10000):
185 while count < msg_count:
195 for i in range(1, 1001):
196 msg = ("sub%s %s" % ([2, 1][i % 2], i)).encode()
199 sock.send(b'sub1 LAST')
200 sock.send(b'sub2 LAST')
202 spawn(rx, sub_all, sub_all_done)
203 spawn(rx, sub1, sub1_done)
204 spawn(rx, sub2, sub2_done)
206 sub1_count = sub1_done.wait()
207 sub2_count = sub2_done.wait()
208 sub_all_count = sub_all_done.wait()
209 self.assertEqual(sub1_count, 500)
210 self.assertEqual(sub2_count, 500)
211 self.assertEqual(sub_all_count, 1000)
213 @skip_unless(zmq_supported)
214 def test_change_subscription(self):
215 pub, sub, port = self.create_bound_pair(zmq.PUB, zmq.SUB)
216 sub.setsockopt(zmq.SUBSCRIBE, b'test')
219 sub_done = event.Event()
221 def rx(sock, done_evt):
229 if b'LAST' in msg and sub == b'test':
230 sock.setsockopt(zmq.UNSUBSCRIBE, b'test')
231 sock.setsockopt(zmq.SUBSCRIBE, b'done')
237 for i in range(1, 101):
238 msg = ("test %s" % i).encode()
242 sock.send(b'test LAST')
244 sock.send(b'done DONE')
246 spawn(rx, sub, sub_done)
249 rx_count = sub_done.wait()
250 self.assertEqual(rx_count, 50)
252 @skip_unless(zmq_supported)
253 def test_recv_multipart_bug68(self):
254 req, rep, port = self.create_bound_pair(zmq.REQ, zmq.REP)
256 req.send_multipart(msg)
257 recieved_msg = rep.recv_multipart()
258 self.assertEqual(recieved_msg, msg)
260 # Send a message back the other way
262 rep.send_multipart(msg2, copy=False)
263 # When receiving a copy it's a zmq.core.message.Message you get back
264 recieved_msg = req.recv_multipart(copy=False)
265 # So it needs to be converted to a string
266 # I'm calling str(m) consciously here; Message has a .data attribute
267 # but it's private __str__ appears to be the way to go
268 self.assertEqual([m.bytes for m in recieved_msg], msg2)
270 @skip_unless(zmq_supported)
271 def test_recv_noblock_bug76(self):
272 req, rep, port = self.create_bound_pair(zmq.REQ, zmq.REP)
273 self.assertRaisesErrno(zmq.EAGAIN, rep.recv, zmq.NOBLOCK)
274 self.assertRaisesErrno(zmq.EAGAIN, rep.recv, zmq.NOBLOCK, True)
276 @skip_unless(zmq_supported)
277 def test_send_during_recv(self):
278 sender, receiver, port = self.create_bound_pair(zmq.XREQ, zmq.XREQ)
282 done_evts = [event.Event() for _ in range(num_recvs)]
284 def slow_rx(done, msg):
285 self.assertEqual(sender.recv(), msg)
291 sender.send(str(tx_i).encode())
296 rx_i = receiver.recv()
298 for i in range(num_recvs):
299 receiver.send(('done%d' % i).encode())
303 for i in range(num_recvs):
304 spawn(slow_rx, done_evts[i], ("done%d" % i).encode())
308 for evt in done_evts:
309 self.assertEqual(evt.wait(), 0)
311 @skip_unless(zmq_supported)
312 def test_send_during_recv_multipart(self):
313 sender, receiver, port = self.create_bound_pair(zmq.XREQ, zmq.XREQ)
317 done_evts = [event.Event() for _ in range(num_recvs)]
319 def slow_rx(done, msg):
320 self.assertEqual(sender.recv_multipart(), msg)
326 sender.send_multipart([str(tx_i).encode(), b'1', b'2', b'3'])
331 rx_i = receiver.recv_multipart()
332 if rx_i == [b"1000", b'1', b'2', b'3']:
333 for i in range(num_recvs):
334 receiver.send_multipart([
335 ('done%d' % i).encode(), b'a', b'b', b'c'])
339 for i in range(num_recvs):
340 spawn(slow_rx, done_evts[i], [
341 ("done%d" % i).encode(), b'a', b'b', b'c'])
345 for i in range(num_recvs):
346 final_i = done_evts[i].wait()
347 self.assertEqual(final_i, 0)
349 # Need someway to ensure a thread is blocked on send... This isn't working
350 @skip_unless(zmq_supported)
351 def test_recv_during_send(self):
352 sender, receiver, port = self.create_bound_pair(zmq.XREQ, zmq.XREQ)
359 except AttributeError:
363 sender.setsockopt(SNDHWM, 10)
364 sender.setsockopt(zmq.SNDBUF, 10)
366 receiver.setsockopt(zmq.RCVBUF, 10)
371 sender.send(str(tx_i).encode())
376 final_i = done.wait()
377 self.assertEqual(final_i, 0)
379 @skip_unless(zmq_supported)
380 def test_close_during_recv(self):
381 sender, receiver, port = self.create_bound_pair(zmq.XREQ, zmq.XREQ)
383 done1 = event.Event()
384 done2 = event.Event()
387 self.assertRaisesErrno(RECV_ON_CLOSED_SOCKET_ERRNOS, receiver.recv)
399 @skip_unless(zmq_supported)
400 def test_getsockopt_events(self):
401 sock1, sock2, _port = self.create_bound_pair(zmq.DEALER, zmq.DEALER)
403 poll_out = zmq.Poller()
404 poll_out.register(sock1, zmq.POLLOUT)
405 sock_map = poll_out.poll(100)
406 self.assertEqual(len(sock_map), 1)
407 events = sock1.getsockopt(zmq.EVENTS)
408 self.assertEqual(events & zmq.POLLOUT, zmq.POLLOUT)
411 poll_in = zmq.Poller()
412 poll_in.register(sock2, zmq.POLLIN)
413 sock_map = poll_in.poll(100)
414 self.assertEqual(len(sock_map), 1)
415 events = sock2.getsockopt(zmq.EVENTS)
416 self.assertEqual(events & zmq.POLLIN, zmq.POLLIN)
418 @skip_unless(zmq_supported)
419 def test_cpu_usage_after_bind(self):
420 """zmq eats CPU after PUB socket .bind()
422 https://bitbucket.org/eventlet/eventlet/issue/128
424 According to the ZeroMQ documentation, the socket file descriptor
425 can be readable without any pending messages. So we need to ensure
426 that Eventlet wraps around ZeroMQ sockets do not create busy loops.
428 A naive way to test it is to measure resource usage. This will require
429 some tuning to set appropriate acceptable limits.
431 sock = self.context.socket(zmq.PUB)
432 self.sockets.append(sock)
433 sock.bind_to_random_port("tcp://127.0.0.1")
435 check_idle_cpu_usage(0.2, 0.1)
437 @skip_unless(zmq_supported)
438 def test_cpu_usage_after_pub_send_or_dealer_recv(self):
439 """zmq eats CPU after PUB send or DEALER recv.
441 Same https://bitbucket.org/eventlet/eventlet/issue/128
443 pub, sub, _port = self.create_bound_pair(zmq.PUB, zmq.SUB)
444 sub.setsockopt(zmq.SUBSCRIBE, b"")
446 pub.send(b'test_send')
447 check_idle_cpu_usage(0.2, 0.1)
449 sender, receiver, _port = self.create_bound_pair(zmq.DEALER, zmq.DEALER)
451 sender.send(b'test_recv')
452 msg = receiver.recv()
453 self.assertEqual(msg, b'test_recv')
454 check_idle_cpu_usage(0.2, 0.1)
457 class TestQueueLock(LimitedTestCase):
458 @skip_unless(zmq_supported)
459 def test_queue_lock_order(self):
461 s = semaphore.Semaphore(0)
478 self.assertEqual(results, [])
483 self.assertEqual(results, [1, 2, 3])
485 @skip_unless(zmq_supported)
486 def test_count(self):
498 @skip_unless(zmq_supported)
499 def test_errors(self):
502 self.assertRaises(zmq.LockReleaseError, q.release)
507 self.assertRaises(zmq.LockReleaseError, q.release)
509 @skip_unless(zmq_supported)
510 def test_nested_acquire(self):
516 s = semaphore.Semaphore(0)
526 self.assertEqual(results, [])
529 self.assertEqual(results, [])
534 self.assertEqual(results, [1])
537 class TestBlockedThread(LimitedTestCase):
538 @skip_unless(zmq_supported)
539 def test_block(self):
540 e = zmq._BlockedThread()
551 self.assertFalse(done.has_result())