1 package com.rabbitmq.amqp1_0.tests.swiftmq;
3 import com.swiftmq.amqp.AMQPContext;
4 import com.swiftmq.amqp.v100.client.*;
5 import com.swiftmq.amqp.v100.generated.messaging.message_format.*;
6 import com.swiftmq.amqp.v100.generated.messaging.message_format.Properties;
7 import com.swiftmq.amqp.v100.messaging.AMQPMessage;
8 import com.swiftmq.amqp.v100.types.*;
9 import junit.framework.TestCase;
11 import java.io.ByteArrayOutputStream;
12 import java.io.DataOutputStream;
13 import java.io.IOException;
14 import java.math.BigDecimal;
17 public class SwiftMQTests extends TestCase {
18 private static final String host = "localhost";
19 private static final int port = 5672;
20 private static final int INBOUND_WINDOW = 100;
21 private static final int OUTBOUND_WINDOW = 100;
22 private static final int CONSUMER_LINK_CREDIT = 200;
23 private static final String QUEUE = "/queue/test";
25 private AMQPMessage msg() {
26 AMQPMessage m = new AMQPMessage();
32 return new Data("Hello World".getBytes());
35 public void testRoundTrip() throws Exception {
36 AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
37 Connection conn = new Connection(ctx, host, port, false);
40 Session s = conn.createSession(INBOUND_WINDOW, OUTBOUND_WINDOW);
41 Producer p = s.createProducer(QUEUE, QoS.AT_LEAST_ONCE);
43 p.close(); // Settlement happens here
44 Consumer c = s.createConsumer(QUEUE, CONSUMER_LINK_CREDIT, QoS.AT_LEAST_ONCE, false, null);
45 AMQPMessage m = c.receive();
47 assertEquals(1, m.getData().size());
48 assertEquals(data(), m.getData().get(0));
52 public void testMessageFragmentation()
53 throws UnsupportedProtocolVersionException, AMQPException, AuthenticationException, IOException {
54 fragmentation(512L, 512);
55 fragmentation(512L, 600);
56 fragmentation(512L, 1024);
57 fragmentation(1024L, 1024);
60 public void fragmentation(long FrameSize, int PayloadSize)
61 throws UnsupportedProtocolVersionException, AMQPException, AuthenticationException, IOException {
62 AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
63 Connection conn = new Connection(ctx, host, port, false);
64 conn.setMaxFrameSize(FrameSize);
66 Session s = conn.createSession(INBOUND_WINDOW, OUTBOUND_WINDOW);
68 Producer p = s.createProducer(QUEUE, QoS.AT_LEAST_ONCE);
69 AMQPMessage msg = new AMQPMessage();
70 msg.addData(new Data(new byte [PayloadSize]));
74 Consumer c = s.createConsumer(QUEUE, CONSUMER_LINK_CREDIT, QoS.AT_LEAST_ONCE, false, null);
75 AMQPMessage m = c.receive();
78 assertEquals(PayloadSize, m.getData().get(0).getValue().length);
82 public void testMessageAnnotations() throws Exception {
83 decorationTest(new DecorationProtocol() {
85 public void decorateMessage(AMQPMessage msg, Map<AMQPString, AMQPType> m) throws IOException {
86 msg.setMessageAnnotations(new MessageAnnotations(m));
89 public Map<AMQPType, AMQPType> getDecoration(AMQPMessage msg) throws IOException {
90 return msg.getMessageAnnotations().getValue();
95 public void testFooter() throws Exception {
96 decorationTest(new DecorationProtocol() {
98 public void decorateMessage(AMQPMessage msg, Map<AMQPString, AMQPType> m) throws IOException {
99 msg.setFooter(new Footer(m));
102 public Map<AMQPType, AMQPType> getDecoration(AMQPMessage msg) throws IOException {
103 return msg.getFooter().getValue();
108 public void testDataTypes() throws Exception {
109 AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
110 Connection conn = new Connection(ctx, host, port, false);
113 Session s = conn.createSession(INBOUND_WINDOW, OUTBOUND_WINDOW);
114 Producer p = s.createProducer(QUEUE, QoS.AT_LEAST_ONCE);
115 AMQPMessage msg = new AMQPMessage();
117 List<AMQPType> al = new ArrayList<AMQPType>();
118 al.add(new AMQPBoolean(true));
119 al.add(new AMQPByte(Byte.MAX_VALUE));
120 al.add(new AMQPChar(Character.CURRENCY_SYMBOL));
121 al.add(new AMQPDecimal64(BigDecimal.TEN));
122 al.add(new AMQPDouble(Double.NaN));
123 al.add(new AMQPInt(Integer.MIN_VALUE));
124 al.add(new AMQPNull());
125 al.add(new AMQPString("\uFFF9"));
126 al.add(new AMQPSymbol(new String(new char[256])));
127 al.add(new AMQPTimestamp(Long.MAX_VALUE));
128 al.add(new AMQPUuid(System.currentTimeMillis(), Long.MIN_VALUE));
129 al.add(new AMQPUnsignedShort(0));
130 al.add(new AMQPArray(AMQPBoolean.FALSE.getCode(), new AMQPBoolean[]{}));
131 al.add(new AmqpSequence(new ArrayList<AMQPType>()));
132 AmqpSequence seq = new AmqpSequence(al);
133 AmqpValue val = new AmqpValue(seq);
134 msg.setAmqpValue(val);
138 Consumer c = s.createConsumer(QUEUE, CONSUMER_LINK_CREDIT, QoS.AT_LEAST_ONCE, false, null);
139 AMQPMessage recvMsg = c.receive();
142 assertEquals(val.getValue().getValueString(), recvMsg.getAmqpValue().getValue().getValueString());
146 public void testAtMostOnce() throws Exception {
147 AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
148 Connection conn = new Connection(ctx, host, port, false);
151 Session s = conn.createSession(INBOUND_WINDOW, OUTBOUND_WINDOW);
152 Producer p = s.createProducer(QUEUE, QoS.AT_MOST_ONCE);
156 Consumer c = s.createConsumer(QUEUE, CONSUMER_LINK_CREDIT, QoS.AT_MOST_ONCE, false, null);
157 AMQPMessage m = c.receive();
158 assertTrue(m.isSettled());
161 s = conn.createSession(INBOUND_WINDOW, OUTBOUND_WINDOW);
162 c = s.createConsumer(QUEUE, CONSUMER_LINK_CREDIT, QoS.AT_MOST_ONCE, false, null);
167 public void testReject() throws Exception {
168 AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
169 Connection conn = new Connection(ctx, host, port, false);
172 Session s = conn.createSession(INBOUND_WINDOW, OUTBOUND_WINDOW);
173 Producer p = s.createProducer(QUEUE, QoS.AT_LEAST_ONCE);
177 Consumer c = s.createConsumer(QUEUE, CONSUMER_LINK_CREDIT, QoS.AT_LEAST_ONCE, false, null);
178 AMQPMessage m = c.receive();
184 public void testRedelivery() throws Exception {
185 AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
186 Connection conn = new Connection(ctx, host, port, false);
189 Session s = conn.createSession(INBOUND_WINDOW, OUTBOUND_WINDOW);
190 Producer p = s.createProducer(QUEUE, QoS.AT_MOST_ONCE);
194 Consumer c = s.createConsumer(QUEUE, CONSUMER_LINK_CREDIT, QoS.AT_LEAST_ONCE, false, null);
195 AMQPMessage m1 = c.receive();
196 assertTrue(m1.getHeader().getFirstAcquirer().getValue());
197 assertFalse(m1.isSettled());
200 s = conn.createSession(INBOUND_WINDOW, OUTBOUND_WINDOW);
201 c = s.createConsumer(QUEUE, CONSUMER_LINK_CREDIT, QoS.AT_LEAST_ONCE, false, null);
202 AMQPMessage m2 = c.receive();
205 assertTrue(compareMessageData(m1, m2));
206 assertFalse(m2.getHeader().getFirstAcquirer().getValue());
211 public void testRouting() throws Exception {
212 route("test", QUEUE, "", true);
213 route(QUEUE, "test", "", true);
214 route("test", "test", "", true);
217 route(QUEUE, "/exchange/missing", "", false);
218 fail("Missing exchange should fail");
219 } catch (Exception e) { }
222 route("/exchange/missing/", QUEUE, "", false);
223 fail("Missing exchange should fail");
224 } catch (Exception e) { }
226 route("/topic/#.c.*", "/topic/a.b.c.d", "", true);
227 route("/topic/#.c.*", "/exchange/amq.topic", "a.b.c.d", true);
228 route("/exchange/amq.topic/#.y.*", "/topic/w.x.y.z", "", true);
229 route("/exchange/amq.topic/#.y.*", "/exchange/amq.topic", "w.x.y.z", true);
231 route("/exchange/amq.fanout/", "/exchange/amq.fanout", "", true);
232 route("/exchange/amq.direct/", "/exchange/amq.direct", "", true);
233 route("/exchange/amq.direct/a", "/exchange/amq.direct", "a", true);
235 route("/amq/queue/test", QUEUE, "", true);
236 route(QUEUE, "/amq/queue/test", "", true);
237 route("/amq/queue/test", "/amq/queue/test", "", true);
239 route("/exchange/amq.direct/b", "/exchange/amq.direct", "a", false);
240 route(QUEUE, "/exchange/amq.fanout", "", false);
241 route(QUEUE, "/exchange/amq.headers", "", false);
245 private void emptyQueue(String q) throws Exception {
246 AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
247 Connection conn = new Connection(ctx, host, port, false);
249 Session s = conn.createSession(INBOUND_WINDOW, OUTBOUND_WINDOW);
250 Consumer c = s.createConsumer(q, CONSUMER_LINK_CREDIT, QoS.AT_MOST_ONCE, false, null);
252 while ((m = get(c)) != null);
256 // Whatever Consumer.receiveNoWait() does, it does not involve the drain
257 // flag, so it's clearly more a case of "have any messages arrived?" rather
258 // than "has the queue got any messages?" Therefore we have an icky timeout
259 // to give the server time to deliver messages. Really we want a way to use
261 private AMQPMessage get(Consumer c) {
262 return c.receive(100);
265 private void route(String consumerSource, String producerTarget, String routingKey, boolean succeed) throws Exception {
266 AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
267 Connection conn = new Connection(ctx, host, port, false);
269 Session s = conn.createSession(INBOUND_WINDOW, OUTBOUND_WINDOW);
271 Consumer c = s.createConsumer(consumerSource, CONSUMER_LINK_CREDIT, QoS.AT_LEAST_ONCE, false, null);
272 Producer p = s.createProducer(producerTarget, QoS.AT_LEAST_ONCE);
273 AMQPMessage msg = msg();
274 AmqpValue sentinel = new AmqpValue(new AMQPDouble(Math.random()));
275 msg.setAmqpValue(sentinel);
276 Properties props = new Properties();
277 props.setSubject(new AMQPString(routingKey));
278 msg.setProperties(props);
282 AMQPMessage m = c.receive();
284 assertEquals(sentinel.getValue().getValueString(), m.getAmqpValue().getValue().getValueString());
294 // TODO: generalise to a comparison of all immutable parts of messages
295 private boolean compareMessageData(AMQPMessage m1, AMQPMessage m2) throws IOException {
296 ByteArrayOutputStream b1 = new ByteArrayOutputStream();
297 ByteArrayOutputStream b2 = new ByteArrayOutputStream();
299 m1.getData().get(0).writeContent(new DataOutputStream(b1));
300 m2.getData().get(0).writeContent(new DataOutputStream(b2));
301 return Arrays.equals(b1.toByteArray(), b2.toByteArray());
304 private void decorationTest(DecorationProtocol d, Map<AMQPString, AMQPType> map) throws Exception {
305 AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
306 Connection conn = new Connection(ctx, host, port, false);
308 Session s = conn.createSession(INBOUND_WINDOW, OUTBOUND_WINDOW);
309 Producer p = s.createProducer(QUEUE, QoS.AT_LEAST_ONCE);
310 AMQPMessage msg = msg();
312 d.decorateMessage(msg, map);
315 Consumer c = s.createConsumer(QUEUE, CONSUMER_LINK_CREDIT, QoS.AT_LEAST_ONCE, false, null);
316 AMQPMessage recvMsg = c.receive();
319 compareMaps(map, d.getDecoration(recvMsg));
323 private void compareMaps(Map<AMQPString, AMQPType> m1, Map<AMQPType, AMQPType> m2){
324 Set e1 = m1.entrySet();
325 Set e2 = m2.entrySet();
326 assertTrue(e1.containsAll(e2));
327 assertTrue(e2.containsAll(e1));
330 private Map<AMQPString, AMQPType> annotationMap() throws IOException {
331 Map<AMQPString, AMQPType> annotations = new HashMap<AMQPString, AMQPType>();
332 // the spec allows keys to be symbol or ulong only, but the library only allows string
333 annotations.put(new AMQPString("key1"), new AMQPString("value1"));
334 annotations.put(new AMQPString("key2"), new AMQPString("value2"));
338 private interface DecorationProtocol {
339 void decorateMessage(AMQPMessage msg, Map<AMQPString, AMQPType> m) throws IOException;
340 Map<AMQPType, AMQPType> getDecoration(AMQPMessage _) throws IOException;