]> review.fuel-infra Code Review - packages/trusty/rabbitmq-server.git/blob
a44fb4852857adcab431e00f0d39cd12311cc218
[packages/trusty/rabbitmq-server.git] /
1 package com.rabbitmq.amqp1_0.tests.swiftmq;
2
3 import com.rabbitmq.client.*;
4 import com.swiftmq.amqp.AMQPContext;
5 import com.swiftmq.amqp.v100.client.*;
6 import com.swiftmq.amqp.v100.client.Connection;
7 import com.swiftmq.amqp.v100.client.Consumer;
8 import com.swiftmq.amqp.v100.generated.messaging.message_format.*;
9 import com.swiftmq.amqp.v100.generated.messaging.message_format.Properties;
10 import com.swiftmq.amqp.v100.messaging.AMQPMessage;
11 import com.swiftmq.amqp.v100.types.*;
12 import junit.framework.TestCase;
13
14 import java.io.ByteArrayOutputStream;
15 import java.io.DataOutputStream;
16 import java.io.IOException;
17 import java.math.BigDecimal;
18 import java.util.*;
19
20 public class SwiftMQTests extends TestCase {
21     private static final String host = "localhost";
22     private static final int port = 5672;
23     private static final int INBOUND_WINDOW = 100;
24     private static final int OUTBOUND_WINDOW = 100;
25     private static final int CONSUMER_LINK_CREDIT = 200;
26     private static final String QUEUE = "/queue/test";
27     private static final int RECEIVE_TIMEOUT = 10000; // 10 seconds timeout.
28
29     private AMQPMessage msg() {
30         AMQPMessage m = new AMQPMessage();
31         m.addData(data());
32         return m;
33     }
34
35     private Data data() {
36         return new Data("Hello World".getBytes());
37     }
38
39     public void testRoundTrip() throws Exception {
40         AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
41         Connection conn = new Connection(ctx, host, port, false);
42         conn.connect();
43
44         Session s = conn.createSession(INBOUND_WINDOW, OUTBOUND_WINDOW);
45         Producer p = s.createProducer(QUEUE, QoS.AT_LEAST_ONCE);
46         p.send(msg());
47         p.close(); // Settlement happens here
48         Consumer c = s.createConsumer(QUEUE, CONSUMER_LINK_CREDIT, QoS.AT_LEAST_ONCE, false, null);
49         AMQPMessage m = c.receive(RECEIVE_TIMEOUT);
50         m.accept();
51         assertEquals(1, m.getData().size());
52         assertEquals(data(), m.getData().get(0));
53         conn.close();
54     }
55
56     public void testMessageFragmentation()
57             throws UnsupportedProtocolVersionException, AMQPException, AuthenticationException, IOException {
58         fragmentation(512L,  512);
59         fragmentation(512L,  600);
60         fragmentation(512L,  1024);
61         fragmentation(1024L, 1024);
62     }
63
64     public void fragmentation(long FrameSize, int PayloadSize)
65             throws UnsupportedProtocolVersionException, AMQPException, AuthenticationException, IOException {
66         AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
67         Connection conn = new Connection(ctx, host, port, false);
68         conn.setMaxFrameSize(FrameSize);
69         conn.connect();
70         Session s = conn.createSession(INBOUND_WINDOW, OUTBOUND_WINDOW);
71
72         Producer p = s.createProducer(QUEUE, QoS.AT_LEAST_ONCE);
73         AMQPMessage msg = new AMQPMessage();
74         msg.addData(new Data(new byte [PayloadSize]));
75         p.send(msg);
76         p.close();
77
78         Consumer c = s.createConsumer(QUEUE, CONSUMER_LINK_CREDIT, QoS.AT_LEAST_ONCE, false, null);
79         AMQPMessage m = c.receive(RECEIVE_TIMEOUT);
80         m.accept();
81         c.close();
82         assertEquals(PayloadSize, m.getData().get(0).getValue().length);
83         conn.close();
84     }
85
86     public void testMessageAnnotations() throws Exception {
87         decorationTest(new DecorationProtocol() {
88             @Override
89             public void decorateMessage(AMQPMessage msg, Map<AMQPString, AMQPType> m) throws IOException {
90                 msg.setMessageAnnotations(new MessageAnnotations(m));
91             }
92             @Override
93             public Map<AMQPType, AMQPType> getDecoration(AMQPMessage msg) throws IOException {
94                 return msg.getMessageAnnotations().getValue();
95             }
96         }, annotationMap());
97     }
98
99     public void testFooter() throws Exception {
100         decorationTest(new DecorationProtocol() {
101             @Override
102             public void decorateMessage(AMQPMessage msg, Map<AMQPString, AMQPType> m) throws IOException {
103                 msg.setFooter(new Footer(m));
104             }
105             @Override
106             public Map<AMQPType, AMQPType> getDecoration(AMQPMessage msg) throws IOException {
107                 return msg.getFooter().getValue();
108             }
109         }, annotationMap());
110     }
111
112     public void testDataTypes() throws Exception {
113         AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
114         Connection conn = new Connection(ctx, host, port, false);
115         conn.connect();
116
117         Session s = conn.createSession(INBOUND_WINDOW, OUTBOUND_WINDOW);
118         Producer p = s.createProducer(QUEUE, QoS.AT_LEAST_ONCE);
119         AMQPMessage msg = new AMQPMessage();
120
121         List<AMQPType> al = new ArrayList<AMQPType>();
122         al.add(new AMQPBoolean(true));
123         al.add(new AMQPByte(Byte.MAX_VALUE));
124         al.add(new AMQPChar(Character.CURRENCY_SYMBOL));
125         al.add(new AMQPDecimal64(BigDecimal.TEN));
126         al.add(new AMQPDouble(Double.NaN));
127         al.add(new AMQPInt(Integer.MIN_VALUE));
128         al.add(new AMQPNull());
129         al.add(new AMQPString("\uFFF9"));
130         al.add(new AMQPSymbol(new String(new char[256])));
131         al.add(new AMQPTimestamp(Long.MAX_VALUE));
132         al.add(new AMQPUuid(System.currentTimeMillis(), Long.MIN_VALUE));
133         al.add(new AMQPUnsignedShort(0));
134         al.add(new AMQPArray(AMQPBoolean.FALSE.getCode(), new AMQPBoolean[]{}));
135         al.add(new AmqpSequence(new ArrayList<AMQPType>()));
136         AmqpSequence seq = new AmqpSequence(al);
137         AmqpValue val = new AmqpValue(seq);
138         msg.setAmqpValue(val);
139
140         p.send(msg);
141         p.close();
142         Consumer c = s.createConsumer(QUEUE, CONSUMER_LINK_CREDIT, QoS.AT_LEAST_ONCE, false, null);
143         AMQPMessage recvMsg = c.receive(RECEIVE_TIMEOUT);
144         recvMsg.accept();
145
146         assertEquals(val.getValue().getValueString(), recvMsg.getAmqpValue().getValue().getValueString());
147         conn.close();
148     }
149
150     public void testAtMostOnce() throws Exception {
151         AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
152         Connection conn = new Connection(ctx, host, port, false);
153         conn.connect();
154
155         Session s = conn.createSession(INBOUND_WINDOW, OUTBOUND_WINDOW);
156         Producer p = s.createProducer(QUEUE, QoS.AT_MOST_ONCE);
157         p.send(msg());
158         p.close();
159
160         Consumer c = s.createConsumer(QUEUE, CONSUMER_LINK_CREDIT, QoS.AT_MOST_ONCE, false, null);
161         AMQPMessage m = c.receive(RECEIVE_TIMEOUT);
162         assertTrue(m.isSettled());
163
164         s.close();
165         s = conn.createSession(INBOUND_WINDOW, OUTBOUND_WINDOW);
166         c = s.createConsumer(QUEUE, CONSUMER_LINK_CREDIT, QoS.AT_MOST_ONCE, false, null);
167         assertNull(get(c));
168         conn.close();
169     }
170
171     public void testReject() throws Exception {
172         AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
173         Connection conn = new Connection(ctx, host, port, false);
174         conn.connect();
175
176         Session s = conn.createSession(INBOUND_WINDOW, OUTBOUND_WINDOW);
177         Producer p = s.createProducer(QUEUE, QoS.AT_LEAST_ONCE);
178         p.send(msg());
179         p.close();
180
181         Consumer c = s.createConsumer(QUEUE, CONSUMER_LINK_CREDIT, QoS.AT_LEAST_ONCE, false, null);
182         AMQPMessage m = c.receive(RECEIVE_TIMEOUT);
183         m.reject();
184         assertNull(get(c));
185         conn.close();
186     }
187
188     public void testRedelivery() throws Exception {
189         AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
190         Connection conn = new Connection(ctx, host, port, false);
191         conn.connect();
192
193         Session s = conn.createSession(INBOUND_WINDOW, OUTBOUND_WINDOW);
194         Producer p = s.createProducer(QUEUE, QoS.AT_MOST_ONCE);
195         p.send(msg());
196         p.close();
197
198         Consumer c = s.createConsumer(QUEUE, CONSUMER_LINK_CREDIT, QoS.AT_LEAST_ONCE, false, null);
199         AMQPMessage m1 = c.receive(RECEIVE_TIMEOUT);
200         assertTrue(m1.getHeader().getFirstAcquirer().getValue());
201         assertFalse(m1.isSettled());
202
203         s.close();
204         s = conn.createSession(INBOUND_WINDOW, OUTBOUND_WINDOW);
205         c = s.createConsumer(QUEUE, CONSUMER_LINK_CREDIT, QoS.AT_LEAST_ONCE, false, null);
206         AMQPMessage m2 = c.receive(RECEIVE_TIMEOUT);
207         m2.accept();
208
209         assertTrue(compareMessageData(m1, m2));
210         assertFalse(m2.getHeader().getFirstAcquirer().getValue());
211         assertNull(get(c));
212         conn.close();
213     }
214
215     public void testRouting() throws Exception {
216         route("test",                      QUEUE,                  "",         true);
217         route(QUEUE,                      "test",                  "",         true);
218         route("test",                     "test",                  "",         true);
219
220         route("/topic/#.c.*",              "/topic/a.b.c.d",        "",        true);
221         route("/topic/#.c.*",              "/exchange/amq.topic",   "a.b.c.d", true);
222         route("/exchange/amq.topic/#.y.*", "/topic/w.x.y.z",        "",        true);
223         route("/exchange/amq.topic/#.y.*", "/exchange/amq.topic",   "w.x.y.z", true);
224
225         route("/exchange/amq.fanout/",     "/exchange/amq.fanout",  "",        true);
226         route("/exchange/amq.direct/",     "/exchange/amq.direct",  "",        true);
227         route("/exchange/amq.direct/a",    "/exchange/amq.direct",  "a",       true);
228
229         /* The following three tests rely on the queue "test" created by
230          * previous tests in this function. */
231         route("/amq/queue/test",           QUEUE,                   "",        true);
232         route(QUEUE,                       "/amq/queue/test",       "",        true);
233         route("/amq/queue/test",           "/amq/queue/test",       "",        true);
234
235         /* The following tests verify that a queue created out-of-band in AMQP
236          * is reachable from the AMQP 1.0 world. */
237         ConnectionFactory factory = new ConnectionFactory();
238         com.rabbitmq.client.Connection connection = factory.newConnection();
239         Channel channel = connection.createChannel();
240         channel.queueDeclare("transient_q", false, false, false, null);
241         route("/amq/queue/transient_q",    "/amq/queue/transient_q", "",       true);
242         channel.queueDelete("transient_q");
243         channel.queueDeclare("durable_q", true, false, false, null);
244         route("/amq/queue/durable_q",      "/amq/queue/durable_q",  "",        true);
245         channel.queueDelete("durable_q");
246         channel.queueDeclare("autodel_q", false, false, true, null);
247         route("/amq/queue/autodel_q",      "/amq/queue/autodel_q",  "",        true);
248         channel.queueDelete("autodel_q");
249         connection.close();
250
251         route("/exchange/amq.direct/b",    "/exchange/amq.direct",  "a",       false);
252         route(QUEUE,                       "/exchange/amq.fanout",  "",        false);
253         route(QUEUE,                       "/exchange/amq.headers", "",        false);
254         emptyQueue(QUEUE);
255     }
256
257     public void testRoutingInvalidRoutes() throws Exception {
258         ConnectionFactory factory = new ConnectionFactory();
259         com.rabbitmq.client.Connection connection = factory.newConnection();
260         Channel channel = connection.createChannel();
261         channel.queueDeclare("transient", false, false, false, null);
262         connection.close();
263
264         for (String dest : Arrays.asList("/exchange/missing", "/fruit/orange")) {
265             routeInvalidSource(dest);
266             routeInvalidTarget(dest);
267         }
268     }
269
270     private void emptyQueue(String q) throws Exception {
271         AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
272         Connection conn = new Connection(ctx, host, port, false);
273         conn.connect();
274         Session s = conn.createSession(INBOUND_WINDOW, OUTBOUND_WINDOW);
275         Consumer c = s.createConsumer(q, CONSUMER_LINK_CREDIT, QoS.AT_MOST_ONCE, false, null);
276         AMQPMessage m;
277         while ((m = get(c)) != null);
278         conn.close();
279     }
280
281     // Whatever Consumer.receiveNoWait() does, it does not involve the drain
282     // flag, so it's clearly more a case of "have any messages arrived?" rather
283     // than "has the queue got any messages?" Therefore we have an icky timeout
284     // to give the server time to deliver messages. Really we want a way to use
285     // drain...
286     private AMQPMessage get(Consumer c) {
287         return c.receive(100);
288     }
289
290     private void route(String consumerSource, String producerTarget, String routingKey, boolean succeed) throws Exception {
291         AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
292         Connection conn = new Connection(ctx, host, port, false);
293         conn.connect();
294         Session s = conn.createSession(INBOUND_WINDOW, OUTBOUND_WINDOW);
295
296         Consumer c = s.createConsumer(consumerSource, CONSUMER_LINK_CREDIT, QoS.AT_LEAST_ONCE, false, null);
297         Producer p = s.createProducer(producerTarget, QoS.AT_LEAST_ONCE);
298         AMQPMessage msg = msg();
299         AmqpValue sentinel = new AmqpValue(new AMQPDouble(Math.random()));
300         msg.setAmqpValue(sentinel);
301         Properties props = new Properties();
302         props.setSubject(new AMQPString(routingKey));
303         msg.setProperties(props);
304         p.send(msg);
305
306         if (succeed) {
307             AMQPMessage m = c.receive(RECEIVE_TIMEOUT);
308             assertNotNull(m);
309             assertEquals(sentinel.getValue().getValueString(), m.getAmqpValue().getValue().getValueString());
310             m.accept();
311         } else {
312             assertNull(get(c));
313         }
314         c.close();
315         p.close();
316         conn.close();
317     }
318
319     private void routeInvalidSource(String consumerSource) throws Exception {
320         AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
321         Connection conn = new Connection(ctx, host, port, false);
322         conn.connect();
323         Session s = conn.createSession(INBOUND_WINDOW, OUTBOUND_WINDOW);
324         try {
325             Consumer c = s.createConsumer(consumerSource, CONSUMER_LINK_CREDIT, QoS.AT_LEAST_ONCE, false, null);
326             c.close();
327             fail("Source '" + consumerSource + "' should fail");
328         }
329         catch (Exception e) {
330             // no-op
331         }
332         finally {
333             conn.close();
334         }
335     }
336
337     private void routeInvalidTarget(String producerTarget) throws Exception {
338         AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
339         Connection conn = new Connection(ctx, host, port, false);
340         conn.connect();
341         Session s = conn.createSession(INBOUND_WINDOW, OUTBOUND_WINDOW);
342         try {
343             Producer p = s.createProducer(producerTarget, QoS.AT_LEAST_ONCE);
344             p.close();
345             fail("Target '" + producerTarget + "' should fail");
346         }
347         catch (Exception e) {
348             // no-op
349         }
350         finally {
351             conn.close();
352         }
353     }
354
355     // TODO: generalise to a comparison of all immutable parts of messages
356     private boolean compareMessageData(AMQPMessage m1, AMQPMessage m2) throws IOException {
357         ByteArrayOutputStream b1 = new ByteArrayOutputStream();
358         ByteArrayOutputStream b2 = new ByteArrayOutputStream();
359
360         m1.getData().get(0).writeContent(new DataOutputStream(b1));
361         m2.getData().get(0).writeContent(new DataOutputStream(b2));
362         return Arrays.equals(b1.toByteArray(), b2.toByteArray());
363     }
364
365     private void decorationTest(DecorationProtocol d, Map<AMQPString, AMQPType> map) throws Exception {
366         AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
367         Connection conn = new Connection(ctx, host, port, false);
368         conn.connect();
369         Session s = conn.createSession(INBOUND_WINDOW, OUTBOUND_WINDOW);
370         Producer p = s.createProducer(QUEUE, QoS.AT_LEAST_ONCE);
371         AMQPMessage msg = msg();
372
373         d.decorateMessage(msg, map);
374         p.send(msg);
375         p.close();
376         Consumer c = s.createConsumer(QUEUE, CONSUMER_LINK_CREDIT, QoS.AT_LEAST_ONCE, false, null);
377         AMQPMessage recvMsg = c.receive(RECEIVE_TIMEOUT);
378         recvMsg.accept();
379
380         compareMaps(map, d.getDecoration(recvMsg));
381         conn.close();
382     }
383
384     private void compareMaps(Map<AMQPString, AMQPType> m1, Map<AMQPType, AMQPType> m2){
385         Set e1 = m1.entrySet();
386         Set e2 = m2.entrySet();
387         assertTrue(e1.containsAll(e2));
388         assertTrue(e2.containsAll(e1));
389     }
390
391     private Map<AMQPString, AMQPType> annotationMap() throws IOException {
392         Map<AMQPString, AMQPType> annotations = new HashMap<AMQPString, AMQPType>();
393         // the spec allows keys to be symbol or ulong only, but the library only allows string
394         annotations.put(new AMQPString("key1"), new AMQPString("value1"));
395         annotations.put(new AMQPString("key2"), new AMQPString("value2"));
396         return annotations;
397     }
398
399     private interface DecorationProtocol {
400         void decorateMessage(AMQPMessage msg, Map<AMQPString, AMQPType> m) throws IOException;
401         Map<AMQPType, AMQPType> getDecoration(AMQPMessage _) throws IOException;
402     }
403
404 }