1 package com.rabbitmq.amqp1_0.tests.swiftmq;
3 import com.rabbitmq.client.*;
4 import com.swiftmq.amqp.AMQPContext;
5 import com.swiftmq.amqp.v100.client.*;
6 import com.swiftmq.amqp.v100.client.Connection;
7 import com.swiftmq.amqp.v100.client.Consumer;
8 import com.swiftmq.amqp.v100.generated.messaging.message_format.*;
9 import com.swiftmq.amqp.v100.generated.messaging.message_format.Properties;
10 import com.swiftmq.amqp.v100.messaging.AMQPMessage;
11 import com.swiftmq.amqp.v100.types.*;
12 import junit.framework.TestCase;
14 import java.io.ByteArrayOutputStream;
15 import java.io.DataOutputStream;
16 import java.io.IOException;
17 import java.math.BigDecimal;
20 public class SwiftMQTests extends TestCase {
21 private static final String host = "localhost";
22 private static final int port = 5672;
23 private static final int INBOUND_WINDOW = 100;
24 private static final int OUTBOUND_WINDOW = 100;
25 private static final int CONSUMER_LINK_CREDIT = 200;
26 private static final String QUEUE = "/queue/test";
27 private static final int RECEIVE_TIMEOUT = 10000; // 10 seconds timeout.
29 private AMQPMessage msg() {
30 AMQPMessage m = new AMQPMessage();
36 return new Data("Hello World".getBytes());
39 public void testRoundTrip() throws Exception {
40 AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
41 Connection conn = new Connection(ctx, host, port, false);
44 Session s = conn.createSession(INBOUND_WINDOW, OUTBOUND_WINDOW);
45 Producer p = s.createProducer(QUEUE, QoS.AT_LEAST_ONCE);
47 p.close(); // Settlement happens here
48 Consumer c = s.createConsumer(QUEUE, CONSUMER_LINK_CREDIT, QoS.AT_LEAST_ONCE, false, null);
49 AMQPMessage m = c.receive(RECEIVE_TIMEOUT);
51 assertEquals(1, m.getData().size());
52 assertEquals(data(), m.getData().get(0));
56 public void testMessageFragmentation()
57 throws UnsupportedProtocolVersionException, AMQPException, AuthenticationException, IOException {
58 fragmentation(512L, 512);
59 fragmentation(512L, 600);
60 fragmentation(512L, 1024);
61 fragmentation(1024L, 1024);
64 public void fragmentation(long FrameSize, int PayloadSize)
65 throws UnsupportedProtocolVersionException, AMQPException, AuthenticationException, IOException {
66 AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
67 Connection conn = new Connection(ctx, host, port, false);
68 conn.setMaxFrameSize(FrameSize);
70 Session s = conn.createSession(INBOUND_WINDOW, OUTBOUND_WINDOW);
72 Producer p = s.createProducer(QUEUE, QoS.AT_LEAST_ONCE);
73 AMQPMessage msg = new AMQPMessage();
74 msg.addData(new Data(new byte [PayloadSize]));
78 Consumer c = s.createConsumer(QUEUE, CONSUMER_LINK_CREDIT, QoS.AT_LEAST_ONCE, false, null);
79 AMQPMessage m = c.receive(RECEIVE_TIMEOUT);
82 assertEquals(PayloadSize, m.getData().get(0).getValue().length);
86 public void testMessageAnnotations() throws Exception {
87 decorationTest(new DecorationProtocol() {
89 public void decorateMessage(AMQPMessage msg, Map<AMQPString, AMQPType> m) throws IOException {
90 msg.setMessageAnnotations(new MessageAnnotations(m));
93 public Map<AMQPType, AMQPType> getDecoration(AMQPMessage msg) throws IOException {
94 return msg.getMessageAnnotations().getValue();
99 public void testFooter() throws Exception {
100 decorationTest(new DecorationProtocol() {
102 public void decorateMessage(AMQPMessage msg, Map<AMQPString, AMQPType> m) throws IOException {
103 msg.setFooter(new Footer(m));
106 public Map<AMQPType, AMQPType> getDecoration(AMQPMessage msg) throws IOException {
107 return msg.getFooter().getValue();
112 public void testDataTypes() throws Exception {
113 AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
114 Connection conn = new Connection(ctx, host, port, false);
117 Session s = conn.createSession(INBOUND_WINDOW, OUTBOUND_WINDOW);
118 Producer p = s.createProducer(QUEUE, QoS.AT_LEAST_ONCE);
119 AMQPMessage msg = new AMQPMessage();
121 List<AMQPType> al = new ArrayList<AMQPType>();
122 al.add(new AMQPBoolean(true));
123 al.add(new AMQPByte(Byte.MAX_VALUE));
124 al.add(new AMQPChar(Character.CURRENCY_SYMBOL));
125 al.add(new AMQPDecimal64(BigDecimal.TEN));
126 al.add(new AMQPDouble(Double.NaN));
127 al.add(new AMQPInt(Integer.MIN_VALUE));
128 al.add(new AMQPNull());
129 al.add(new AMQPString("\uFFF9"));
130 al.add(new AMQPSymbol(new String(new char[256])));
131 al.add(new AMQPTimestamp(Long.MAX_VALUE));
132 al.add(new AMQPUuid(System.currentTimeMillis(), Long.MIN_VALUE));
133 al.add(new AMQPUnsignedShort(0));
134 al.add(new AMQPArray(AMQPBoolean.FALSE.getCode(), new AMQPBoolean[]{}));
135 al.add(new AmqpSequence(new ArrayList<AMQPType>()));
136 AmqpSequence seq = new AmqpSequence(al);
137 AmqpValue val = new AmqpValue(seq);
138 msg.setAmqpValue(val);
142 Consumer c = s.createConsumer(QUEUE, CONSUMER_LINK_CREDIT, QoS.AT_LEAST_ONCE, false, null);
143 AMQPMessage recvMsg = c.receive(RECEIVE_TIMEOUT);
146 assertEquals(val.getValue().getValueString(), recvMsg.getAmqpValue().getValue().getValueString());
150 public void testAtMostOnce() throws Exception {
151 AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
152 Connection conn = new Connection(ctx, host, port, false);
155 Session s = conn.createSession(INBOUND_WINDOW, OUTBOUND_WINDOW);
156 Producer p = s.createProducer(QUEUE, QoS.AT_MOST_ONCE);
160 Consumer c = s.createConsumer(QUEUE, CONSUMER_LINK_CREDIT, QoS.AT_MOST_ONCE, false, null);
161 AMQPMessage m = c.receive(RECEIVE_TIMEOUT);
162 assertTrue(m.isSettled());
165 s = conn.createSession(INBOUND_WINDOW, OUTBOUND_WINDOW);
166 c = s.createConsumer(QUEUE, CONSUMER_LINK_CREDIT, QoS.AT_MOST_ONCE, false, null);
171 public void testReject() throws Exception {
172 AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
173 Connection conn = new Connection(ctx, host, port, false);
176 Session s = conn.createSession(INBOUND_WINDOW, OUTBOUND_WINDOW);
177 Producer p = s.createProducer(QUEUE, QoS.AT_LEAST_ONCE);
181 Consumer c = s.createConsumer(QUEUE, CONSUMER_LINK_CREDIT, QoS.AT_LEAST_ONCE, false, null);
182 AMQPMessage m = c.receive(RECEIVE_TIMEOUT);
188 public void testRedelivery() throws Exception {
189 AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
190 Connection conn = new Connection(ctx, host, port, false);
193 Session s = conn.createSession(INBOUND_WINDOW, OUTBOUND_WINDOW);
194 Producer p = s.createProducer(QUEUE, QoS.AT_MOST_ONCE);
198 Consumer c = s.createConsumer(QUEUE, CONSUMER_LINK_CREDIT, QoS.AT_LEAST_ONCE, false, null);
199 AMQPMessage m1 = c.receive(RECEIVE_TIMEOUT);
200 assertTrue(m1.getHeader().getFirstAcquirer().getValue());
201 assertFalse(m1.isSettled());
204 s = conn.createSession(INBOUND_WINDOW, OUTBOUND_WINDOW);
205 c = s.createConsumer(QUEUE, CONSUMER_LINK_CREDIT, QoS.AT_LEAST_ONCE, false, null);
206 AMQPMessage m2 = c.receive(RECEIVE_TIMEOUT);
209 assertTrue(compareMessageData(m1, m2));
210 assertFalse(m2.getHeader().getFirstAcquirer().getValue());
215 public void testRouting() throws Exception {
216 route("test", QUEUE, "", true);
217 route(QUEUE, "test", "", true);
218 route("test", "test", "", true);
220 route("/topic/#.c.*", "/topic/a.b.c.d", "", true);
221 route("/topic/#.c.*", "/exchange/amq.topic", "a.b.c.d", true);
222 route("/exchange/amq.topic/#.y.*", "/topic/w.x.y.z", "", true);
223 route("/exchange/amq.topic/#.y.*", "/exchange/amq.topic", "w.x.y.z", true);
225 route("/exchange/amq.fanout/", "/exchange/amq.fanout", "", true);
226 route("/exchange/amq.direct/", "/exchange/amq.direct", "", true);
227 route("/exchange/amq.direct/a", "/exchange/amq.direct", "a", true);
229 /* The following three tests rely on the queue "test" created by
230 * previous tests in this function. */
231 route("/amq/queue/test", QUEUE, "", true);
232 route(QUEUE, "/amq/queue/test", "", true);
233 route("/amq/queue/test", "/amq/queue/test", "", true);
235 /* The following tests verify that a queue created out-of-band in AMQP
236 * is reachable from the AMQP 1.0 world. */
237 ConnectionFactory factory = new ConnectionFactory();
238 com.rabbitmq.client.Connection connection = factory.newConnection();
239 Channel channel = connection.createChannel();
240 channel.queueDeclare("transient_q", false, false, false, null);
241 route("/amq/queue/transient_q", "/amq/queue/transient_q", "", true);
242 channel.queueDelete("transient_q");
243 channel.queueDeclare("durable_q", true, false, false, null);
244 route("/amq/queue/durable_q", "/amq/queue/durable_q", "", true);
245 channel.queueDelete("durable_q");
246 channel.queueDeclare("autodel_q", false, false, true, null);
247 route("/amq/queue/autodel_q", "/amq/queue/autodel_q", "", true);
248 channel.queueDelete("autodel_q");
251 route("/exchange/amq.direct/b", "/exchange/amq.direct", "a", false);
252 route(QUEUE, "/exchange/amq.fanout", "", false);
253 route(QUEUE, "/exchange/amq.headers", "", false);
257 public void testRoutingInvalidRoutes() throws Exception {
258 ConnectionFactory factory = new ConnectionFactory();
259 com.rabbitmq.client.Connection connection = factory.newConnection();
260 Channel channel = connection.createChannel();
261 channel.queueDeclare("transient", false, false, false, null);
264 for (String dest : Arrays.asList("/exchange/missing", "/fruit/orange")) {
265 routeInvalidSource(dest);
266 routeInvalidTarget(dest);
270 private void emptyQueue(String q) throws Exception {
271 AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
272 Connection conn = new Connection(ctx, host, port, false);
274 Session s = conn.createSession(INBOUND_WINDOW, OUTBOUND_WINDOW);
275 Consumer c = s.createConsumer(q, CONSUMER_LINK_CREDIT, QoS.AT_MOST_ONCE, false, null);
277 while ((m = get(c)) != null);
281 // Whatever Consumer.receiveNoWait() does, it does not involve the drain
282 // flag, so it's clearly more a case of "have any messages arrived?" rather
283 // than "has the queue got any messages?" Therefore we have an icky timeout
284 // to give the server time to deliver messages. Really we want a way to use
286 private AMQPMessage get(Consumer c) {
287 return c.receive(100);
290 private void route(String consumerSource, String producerTarget, String routingKey, boolean succeed) throws Exception {
291 AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
292 Connection conn = new Connection(ctx, host, port, false);
294 Session s = conn.createSession(INBOUND_WINDOW, OUTBOUND_WINDOW);
296 Consumer c = s.createConsumer(consumerSource, CONSUMER_LINK_CREDIT, QoS.AT_LEAST_ONCE, false, null);
297 Producer p = s.createProducer(producerTarget, QoS.AT_LEAST_ONCE);
298 AMQPMessage msg = msg();
299 AmqpValue sentinel = new AmqpValue(new AMQPDouble(Math.random()));
300 msg.setAmqpValue(sentinel);
301 Properties props = new Properties();
302 props.setSubject(new AMQPString(routingKey));
303 msg.setProperties(props);
307 AMQPMessage m = c.receive(RECEIVE_TIMEOUT);
309 assertEquals(sentinel.getValue().getValueString(), m.getAmqpValue().getValue().getValueString());
319 private void routeInvalidSource(String consumerSource) throws Exception {
320 AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
321 Connection conn = new Connection(ctx, host, port, false);
323 Session s = conn.createSession(INBOUND_WINDOW, OUTBOUND_WINDOW);
325 Consumer c = s.createConsumer(consumerSource, CONSUMER_LINK_CREDIT, QoS.AT_LEAST_ONCE, false, null);
327 fail("Source '" + consumerSource + "' should fail");
329 catch (Exception e) {
337 private void routeInvalidTarget(String producerTarget) throws Exception {
338 AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
339 Connection conn = new Connection(ctx, host, port, false);
341 Session s = conn.createSession(INBOUND_WINDOW, OUTBOUND_WINDOW);
343 Producer p = s.createProducer(producerTarget, QoS.AT_LEAST_ONCE);
345 fail("Target '" + producerTarget + "' should fail");
347 catch (Exception e) {
355 // TODO: generalise to a comparison of all immutable parts of messages
356 private boolean compareMessageData(AMQPMessage m1, AMQPMessage m2) throws IOException {
357 ByteArrayOutputStream b1 = new ByteArrayOutputStream();
358 ByteArrayOutputStream b2 = new ByteArrayOutputStream();
360 m1.getData().get(0).writeContent(new DataOutputStream(b1));
361 m2.getData().get(0).writeContent(new DataOutputStream(b2));
362 return Arrays.equals(b1.toByteArray(), b2.toByteArray());
365 private void decorationTest(DecorationProtocol d, Map<AMQPString, AMQPType> map) throws Exception {
366 AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
367 Connection conn = new Connection(ctx, host, port, false);
369 Session s = conn.createSession(INBOUND_WINDOW, OUTBOUND_WINDOW);
370 Producer p = s.createProducer(QUEUE, QoS.AT_LEAST_ONCE);
371 AMQPMessage msg = msg();
373 d.decorateMessage(msg, map);
376 Consumer c = s.createConsumer(QUEUE, CONSUMER_LINK_CREDIT, QoS.AT_LEAST_ONCE, false, null);
377 AMQPMessage recvMsg = c.receive(RECEIVE_TIMEOUT);
380 compareMaps(map, d.getDecoration(recvMsg));
384 private void compareMaps(Map<AMQPString, AMQPType> m1, Map<AMQPType, AMQPType> m2){
385 Set e1 = m1.entrySet();
386 Set e2 = m2.entrySet();
387 assertTrue(e1.containsAll(e2));
388 assertTrue(e2.containsAll(e1));
391 private Map<AMQPString, AMQPType> annotationMap() throws IOException {
392 Map<AMQPString, AMQPType> annotations = new HashMap<AMQPString, AMQPType>();
393 // the spec allows keys to be symbol or ulong only, but the library only allows string
394 annotations.put(new AMQPString("key1"), new AMQPString("value1"));
395 annotations.put(new AMQPString("key2"), new AMQPString("value2"));
399 private interface DecorationProtocol {
400 void decorateMessage(AMQPMessage msg, Map<AMQPString, AMQPType> m) throws IOException;
401 Map<AMQPType, AMQPType> getDecoration(AMQPMessage _) throws IOException;