]> review.fuel-infra Code Review - packages/trusty/rabbitmq-server.git/blob
b58e2a5be8c11ad6ea8b9ff972f0bc0834395a3a
[packages/trusty/rabbitmq-server.git] /
1 package com.rabbitmq.amqp1_0.tests.swiftmq;
2
3 import com.swiftmq.amqp.AMQPContext;
4 import com.swiftmq.amqp.v100.client.*;
5 import com.swiftmq.amqp.v100.generated.messaging.message_format.*;
6 import com.swiftmq.amqp.v100.generated.messaging.message_format.Properties;
7 import com.swiftmq.amqp.v100.messaging.AMQPMessage;
8 import com.swiftmq.amqp.v100.types.*;
9 import junit.framework.TestCase;
10
11 import java.io.ByteArrayOutputStream;
12 import java.io.DataOutputStream;
13 import java.io.IOException;
14 import java.math.BigDecimal;
15 import java.util.*;
16
17 public class SwiftMQTests extends TestCase {
18     private static final String host = "localhost";
19     private static final int port = 5672;
20     private static final int INBOUND_WINDOW = 100;
21     private static final int OUTBOUND_WINDOW = 100;
22     private static final int CONSUMER_LINK_CREDIT = 200;
23     private static final String QUEUE = "/queue/test";
24
25     private AMQPMessage msg() {
26         AMQPMessage m = new AMQPMessage();
27         m.addData(data());
28         return m;
29     }
30
31     private Data data() {
32         return new Data("Hello World".getBytes());
33     }
34
35     public void testRoundTrip() throws Exception {
36         AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
37         Connection conn = new Connection(ctx, host, port, false);
38         conn.connect();
39
40         Session s = conn.createSession(INBOUND_WINDOW, OUTBOUND_WINDOW);
41         Producer p = s.createProducer(QUEUE, QoS.AT_LEAST_ONCE);
42         p.send(msg());
43         p.close(); // Settlement happens here
44         Consumer c = s.createConsumer(QUEUE, CONSUMER_LINK_CREDIT, QoS.AT_LEAST_ONCE, false, null);
45         AMQPMessage m = c.receive();
46         m.accept();
47         assertEquals(1, m.getData().size());
48         assertEquals(data(), m.getData().get(0));
49         conn.close();
50     }
51
52     public void testMessageFragmentation()
53             throws UnsupportedProtocolVersionException, AMQPException, AuthenticationException, IOException {
54         fragmentation(512L,  512);
55         fragmentation(512L,  600);
56         fragmentation(512L,  1024);
57         fragmentation(1024L, 1024);
58     }
59
60     public void fragmentation(long FrameSize, int PayloadSize)
61             throws UnsupportedProtocolVersionException, AMQPException, AuthenticationException, IOException {
62         AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
63         Connection conn = new Connection(ctx, host, port, false);
64         conn.setMaxFrameSize(FrameSize);
65         conn.connect();
66         Session s = conn.createSession(INBOUND_WINDOW, OUTBOUND_WINDOW);
67
68         Producer p = s.createProducer(QUEUE, QoS.AT_LEAST_ONCE);
69         AMQPMessage msg = new AMQPMessage();
70         msg.addData(new Data(new byte [PayloadSize]));
71         p.send(msg);
72         p.close();
73
74         Consumer c = s.createConsumer(QUEUE, CONSUMER_LINK_CREDIT, QoS.AT_LEAST_ONCE, false, null);
75         AMQPMessage m = c.receive();
76         m.accept();
77         c.close();
78         assertEquals(PayloadSize, m.getData().get(0).getValue().length);
79         conn.close();
80     }
81
82     public void testMessageAnnotations() throws Exception {
83         decorationTest(new DecorationProtocol() {
84             @Override
85             public void decorateMessage(AMQPMessage msg, Map<AMQPString, AMQPType> m) throws IOException {
86                 msg.setMessageAnnotations(new MessageAnnotations(m));
87             }
88             @Override
89             public Map<AMQPType, AMQPType> getDecoration(AMQPMessage msg) throws IOException {
90                 return msg.getMessageAnnotations().getValue();
91             }
92         }, annotationMap());
93     }
94
95     public void testFooter() throws Exception {
96         decorationTest(new DecorationProtocol() {
97             @Override
98             public void decorateMessage(AMQPMessage msg, Map<AMQPString, AMQPType> m) throws IOException {
99                 msg.setFooter(new Footer(m));
100             }
101             @Override
102             public Map<AMQPType, AMQPType> getDecoration(AMQPMessage msg) throws IOException {
103                 return msg.getFooter().getValue();
104             }
105         }, annotationMap());
106     }
107
108     public void testDataTypes() throws Exception {
109         AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
110         Connection conn = new Connection(ctx, host, port, false);
111         conn.connect();
112
113         Session s = conn.createSession(INBOUND_WINDOW, OUTBOUND_WINDOW);
114         Producer p = s.createProducer(QUEUE, QoS.AT_LEAST_ONCE);
115         AMQPMessage msg = new AMQPMessage();
116
117         List<AMQPType> al = new ArrayList<AMQPType>();
118         al.add(new AMQPBoolean(true));
119         al.add(new AMQPByte(Byte.MAX_VALUE));
120         al.add(new AMQPChar(Character.CURRENCY_SYMBOL));
121         al.add(new AMQPDecimal64(BigDecimal.TEN));
122         al.add(new AMQPDouble(Double.NaN));
123         al.add(new AMQPInt(Integer.MIN_VALUE));
124         al.add(new AMQPNull());
125         al.add(new AMQPString("\uFFF9"));
126         al.add(new AMQPSymbol(new String(new char[256])));
127         al.add(new AMQPTimestamp(Long.MAX_VALUE));
128         al.add(new AMQPUuid(System.currentTimeMillis(), Long.MIN_VALUE));
129         al.add(new AMQPUnsignedShort(0));
130         al.add(new AMQPArray(AMQPBoolean.FALSE.getCode(), new AMQPBoolean[]{}));
131         al.add(new AmqpSequence(new ArrayList<AMQPType>()));
132         AmqpSequence seq = new AmqpSequence(al);
133         AmqpValue val = new AmqpValue(seq);
134         msg.setAmqpValue(val);
135
136         p.send(msg);
137         p.close();
138         Consumer c = s.createConsumer(QUEUE, CONSUMER_LINK_CREDIT, QoS.AT_LEAST_ONCE, false, null);
139         AMQPMessage recvMsg = c.receive();
140         recvMsg.accept();
141
142         assertEquals(val.getValue().getValueString(), recvMsg.getAmqpValue().getValue().getValueString());
143         conn.close();
144     }
145
146     public void testAtMostOnce() throws Exception {
147         AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
148         Connection conn = new Connection(ctx, host, port, false);
149         conn.connect();
150
151         Session s = conn.createSession(INBOUND_WINDOW, OUTBOUND_WINDOW);
152         Producer p = s.createProducer(QUEUE, QoS.AT_MOST_ONCE);
153         p.send(msg());
154         p.close();
155
156         Consumer c = s.createConsumer(QUEUE, CONSUMER_LINK_CREDIT, QoS.AT_MOST_ONCE, false, null);
157         AMQPMessage m = c.receive();
158         assertTrue(m.isSettled());
159
160         s.close();
161         s = conn.createSession(INBOUND_WINDOW, OUTBOUND_WINDOW);
162         c = s.createConsumer(QUEUE, CONSUMER_LINK_CREDIT, QoS.AT_MOST_ONCE, false, null);
163         assertNull(get(c));
164         conn.close();
165     }
166
167     public void testReject() throws Exception {
168         AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
169         Connection conn = new Connection(ctx, host, port, false);
170         conn.connect();
171
172         Session s = conn.createSession(INBOUND_WINDOW, OUTBOUND_WINDOW);
173         Producer p = s.createProducer(QUEUE, QoS.AT_LEAST_ONCE);
174         p.send(msg());
175         p.close();
176
177         Consumer c = s.createConsumer(QUEUE, CONSUMER_LINK_CREDIT, QoS.AT_LEAST_ONCE, false, null);
178         AMQPMessage m = c.receive();
179         m.reject();
180         assertNull(get(c));
181         conn.close();
182     }
183
184     public void testRedelivery() throws Exception {
185         AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
186         Connection conn = new Connection(ctx, host, port, false);
187         conn.connect();
188
189         Session s = conn.createSession(INBOUND_WINDOW, OUTBOUND_WINDOW);
190         Producer p = s.createProducer(QUEUE, QoS.AT_MOST_ONCE);
191         p.send(msg());
192         p.close();
193
194         Consumer c = s.createConsumer(QUEUE, CONSUMER_LINK_CREDIT, QoS.AT_LEAST_ONCE, false, null);
195         AMQPMessage m1 = c.receive();
196         assertTrue(m1.getHeader().getFirstAcquirer().getValue());
197         assertFalse(m1.isSettled());
198
199         s.close();
200         s = conn.createSession(INBOUND_WINDOW, OUTBOUND_WINDOW);
201         c = s.createConsumer(QUEUE, CONSUMER_LINK_CREDIT, QoS.AT_LEAST_ONCE, false, null);
202         AMQPMessage m2 = c.receive();
203         m2.accept();
204
205         assertTrue(compareMessageData(m1, m2));
206         assertFalse(m2.getHeader().getFirstAcquirer().getValue());
207         assertNull(get(c));
208         conn.close();
209     }
210
211     public void testRouting() throws Exception {
212         route("test",                      QUEUE,                  "",         true);
213         route(QUEUE,                      "test",                  "",         true);
214         route("test",                     "test",                  "",         true);
215
216         try {
217             route(QUEUE,                  "/exchange/missing",    "",        false);
218             fail("Missing exchange should fail");
219         } catch (Exception e) { }
220
221         try {
222             route("/exchange/missing/",    QUEUE,                  "",        false);
223             fail("Missing exchange should fail");
224         } catch (Exception e) { }
225
226         route("/topic/#.c.*",              "/topic/a.b.c.d",        "",        true);
227         route("/topic/#.c.*",              "/exchange/amq.topic",   "a.b.c.d", true);
228         route("/exchange/amq.topic/#.y.*", "/topic/w.x.y.z",        "",        true);
229         route("/exchange/amq.topic/#.y.*", "/exchange/amq.topic",   "w.x.y.z", true);
230
231         route("/exchange/amq.fanout/",     "/exchange/amq.fanout",  "",        true);
232         route("/exchange/amq.direct/",     "/exchange/amq.direct",  "",        true);
233         route("/exchange/amq.direct/a",    "/exchange/amq.direct",  "a",       true);
234
235         route("/amq/queue/test",           QUEUE,                   "",        true);
236         route(QUEUE,                       "/amq/queue/test",       "",        true);
237         route("/amq/queue/test",           "/amq/queue/test",       "",        true);
238
239         route("/exchange/amq.direct/b",    "/exchange/amq.direct",  "a",       false);
240         route(QUEUE,                       "/exchange/amq.fanout",  "",        false);
241         route(QUEUE,                       "/exchange/amq.headers", "",        false);
242         emptyQueue(QUEUE);
243     }
244
245     private void emptyQueue(String q) throws Exception {
246         AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
247         Connection conn = new Connection(ctx, host, port, false);
248         conn.connect();
249         Session s = conn.createSession(INBOUND_WINDOW, OUTBOUND_WINDOW);
250         Consumer c = s.createConsumer(q, CONSUMER_LINK_CREDIT, QoS.AT_MOST_ONCE, false, null);
251         AMQPMessage m;
252         while ((m = get(c)) != null);
253         conn.close();
254     }
255
256     // Whatever Consumer.receiveNoWait() does, it does not involve the drain
257     // flag, so it's clearly more a case of "have any messages arrived?" rather
258     // than "has the queue got any messages?" Therefore we have an icky timeout
259     // to give the server time to deliver messages. Really we want a way to use
260     // drain...
261     private AMQPMessage get(Consumer c) {
262         return c.receive(100);
263     }
264
265     private void route(String consumerSource, String producerTarget, String routingKey, boolean succeed) throws Exception {
266         AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
267         Connection conn = new Connection(ctx, host, port, false);
268         conn.connect();
269         Session s = conn.createSession(INBOUND_WINDOW, OUTBOUND_WINDOW);
270
271         Consumer c = s.createConsumer(consumerSource, CONSUMER_LINK_CREDIT, QoS.AT_LEAST_ONCE, false, null);
272         Producer p = s.createProducer(producerTarget, QoS.AT_LEAST_ONCE);
273         AMQPMessage msg = msg();
274         AmqpValue sentinel = new AmqpValue(new AMQPDouble(Math.random()));
275         msg.setAmqpValue(sentinel);
276         Properties props = new Properties();
277         props.setSubject(new AMQPString(routingKey));
278         msg.setProperties(props);
279         p.send(msg);
280
281         if (succeed) {
282             AMQPMessage m = c.receive();
283             assertNotNull(m);
284             assertEquals(sentinel.getValue().getValueString(), m.getAmqpValue().getValue().getValueString());
285             m.accept();
286         } else {
287             assertNull(get(c));
288         }
289         c.close();
290         p.close();
291         conn.close();
292     }
293
294     // TODO: generalise to a comparison of all immutable parts of messages
295     private boolean compareMessageData(AMQPMessage m1, AMQPMessage m2) throws IOException {
296         ByteArrayOutputStream b1 = new ByteArrayOutputStream();
297         ByteArrayOutputStream b2 = new ByteArrayOutputStream();
298
299         m1.getData().get(0).writeContent(new DataOutputStream(b1));
300         m2.getData().get(0).writeContent(new DataOutputStream(b2));
301         return Arrays.equals(b1.toByteArray(), b2.toByteArray());
302     }
303
304     private void decorationTest(DecorationProtocol d, Map<AMQPString, AMQPType> map) throws Exception {
305         AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
306         Connection conn = new Connection(ctx, host, port, false);
307         conn.connect();
308         Session s = conn.createSession(INBOUND_WINDOW, OUTBOUND_WINDOW);
309         Producer p = s.createProducer(QUEUE, QoS.AT_LEAST_ONCE);
310         AMQPMessage msg = msg();
311
312         d.decorateMessage(msg, map);
313         p.send(msg);
314         p.close();
315         Consumer c = s.createConsumer(QUEUE, CONSUMER_LINK_CREDIT, QoS.AT_LEAST_ONCE, false, null);
316         AMQPMessage recvMsg = c.receive();
317         recvMsg.accept();
318
319         compareMaps(map, d.getDecoration(recvMsg));
320         conn.close();
321     }
322
323     private void compareMaps(Map<AMQPString, AMQPType> m1, Map<AMQPType, AMQPType> m2){
324         Set e1 = m1.entrySet();
325         Set e2 = m2.entrySet();
326         assertTrue(e1.containsAll(e2));
327         assertTrue(e2.containsAll(e1));
328     }
329
330     private Map<AMQPString, AMQPType> annotationMap() throws IOException {
331         Map<AMQPString, AMQPType> annotations = new HashMap<AMQPString, AMQPType>();
332         // the spec allows keys to be symbol or ulong only, but the library only allows string
333         annotations.put(new AMQPString("key1"), new AMQPString("value1"));
334         annotations.put(new AMQPString("key2"), new AMQPString("value2"));
335         return annotations;
336     }
337
338     private interface DecorationProtocol {
339         void decorateMessage(AMQPMessage msg, Map<AMQPString, AMQPType> m) throws IOException;
340         Map<AMQPType, AMQPType> getDecoration(AMQPMessage _) throws IOException;
341     }
342
343 }