1 %% The contents of this file are subject to the Mozilla Public License
2 %% Version 1.1 (the "License"); you may not use this file except in
3 %% compliance with the License. You may obtain a copy of the License
4 %% at http://www.mozilla.org/MPL/
6 %% Software distributed under the License is distributed on an "AS IS"
7 %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8 %% the License for the specific language governing rights and
9 %% limitations under the License.
11 %% The Original Code is RabbitMQ Consistent Hash Exchange.
13 %% The Initial Developer of the Original Code is GoPivotal, Inc.
14 %% Copyright (c) 2011-2014 GoPivotal, Inc. All rights reserved.
17 -module(rabbit_exchange_type_consistent_hash_test).
19 -include_lib("amqp_client/include/amqp_client.hrl").
21 %% Because the routing is probabilistic, we can't really test a great
25 %% Run the test twice to test we clean up correctly
26 t([<<"q0">>, <<"q1">>, <<"q2">>, <<"q3">>]),
27 t([<<"q4">>, <<"q5">>, <<"q6">>, <<"q7">>]).
30 ok = test_with_rk(Qs),
31 ok = test_with_header(Qs),
36 #'basic.publish'{exchange = <<"e">>, routing_key = rnd()}
39 #amqp_msg{props = #'P_basic'{}, payload = <<>>}
42 test_with_header(Qs) ->
44 #'basic.publish'{exchange = <<"e">>}
47 H = [{<<"hashme">>, longstr, rnd()}],
48 #amqp_msg{props = #'P_basic'{headers = H}, payload = <<>>}
49 end, [{<<"hash-header">>, longstr, <<"hashme">>}], Qs).
52 list_to_binary(integer_to_list(random:uniform(1000000))).
54 test0(MakeMethod, MakeMsg, DeclareArgs, [Q1, Q2, Q3, Q4] = Queues) ->
57 {ok, Conn} = amqp_connection:start(#amqp_params_network{}),
58 {ok, Chan} = amqp_connection:open_channel(Conn),
59 #'exchange.declare_ok'{} =
60 amqp_channel:call(Chan,
63 type = <<"x-consistent-hash">>,
65 arguments = DeclareArgs
67 [#'queue.declare_ok'{} =
68 amqp_channel:call(Chan, #'queue.declare' {
69 queue = Q, exclusive = true }) || Q <- Queues],
71 amqp_channel:call(Chan, #'queue.bind' { queue = Q,
73 routing_key = <<"10">> })
76 amqp_channel:call(Chan, #'queue.bind' { queue = Q,
78 routing_key = <<"20">> })
80 #'tx.select_ok'{} = amqp_channel:call(Chan, #'tx.select'{}),
81 [amqp_channel:call(Chan,
83 MakeMsg()) || _ <- lists:duplicate(Count, const)],
84 amqp_channel:call(Chan, #'tx.commit'{}),
87 #'queue.declare_ok'{message_count = M} =
88 amqp_channel:call(Chan, #'queue.declare' {queue = Q,
92 Count = lists:sum(Counts), %% All messages got routed
93 [true = C > 0.01 * Count || C <- Counts], %% We are not *grossly* unfair
94 amqp_channel:call(Chan, #'exchange.delete' { exchange = <<"e">> }),
95 [amqp_channel:call(Chan, #'queue.delete' { queue = Q }) || Q <- Queues],
96 amqp_channel:close(Chan),
97 amqp_connection:close(Conn),