1 %% The contents of this file are subject to the Mozilla Public License
2 %% Version 1.1 (the "License"); you may not use this file except in
3 %% compliance with the License. You may obtain a copy of the License
4 %% at http://www.mozilla.org/MPL/
6 %% Software distributed under the License is distributed on an "AS IS"
7 %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8 %% the License for the specific language governing rights and
9 %% limitations under the License.
11 %% The Original Code is RabbitMQ Consistent Hash Exchange.
13 %% The Initial Developer of the Original Code is GoPivotal, Inc.
14 %% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
17 -module(rabbit_exchange_type_consistent_hash_test).
19 -include_lib("amqp_client/include/amqp_client.hrl").
20 -include_lib("eunit/include/eunit.hrl").
22 %% Because the routing is probabilistic, we can't really test a great
26 %% Run the test twice to test we clean up correctly
27 t([<<"q0">>, <<"q1">>, <<"q2">>, <<"q3">>]),
28 t([<<"q4">>, <<"q5">>, <<"q6">>, <<"q7">>]).
31 ok = test_with_rk(Qs),
32 ok = test_with_header(Qs),
33 ok = test_binding_with_negative_routing_key(),
34 ok = test_binding_with_non_numeric_routing_key(),
35 ok = test_with_correlation_id(Qs),
36 ok = test_with_message_id(Qs),
37 ok = test_with_timestamp(Qs),
38 ok = test_non_supported_property(),
39 ok = test_mutually_exclusive_arguments(),
44 #'basic.publish'{exchange = <<"e">>, routing_key = rnd()}
47 #amqp_msg{props = #'P_basic'{}, payload = <<>>}
50 test_with_header(Qs) ->
52 #'basic.publish'{exchange = <<"e">>}
55 H = [{<<"hashme">>, longstr, rnd()}],
56 #amqp_msg{props = #'P_basic'{headers = H}, payload = <<>>}
57 end, [{<<"hash-header">>, longstr, <<"hashme">>}], Qs).
60 test_with_correlation_id(Qs) ->
62 #'basic.publish'{exchange = <<"e">>}
65 #amqp_msg{props = #'P_basic'{correlation_id = rnd()}, payload = <<>>}
66 end, [{<<"hash-property">>, longstr, <<"correlation_id">>}], Qs).
68 test_with_message_id(Qs) ->
70 #'basic.publish'{exchange = <<"e">>}
73 #amqp_msg{props = #'P_basic'{message_id = rnd()}, payload = <<>>}
74 end, [{<<"hash-property">>, longstr, <<"message_id">>}], Qs).
76 test_with_timestamp(Qs) ->
78 #'basic.publish'{exchange = <<"e">>}
81 #amqp_msg{props = #'P_basic'{timestamp = rndint()}, payload = <<>>}
82 end, [{<<"hash-property">>, longstr, <<"timestamp">>}], Qs).
84 test_mutually_exclusive_arguments() ->
85 {ok, Conn} = amqp_connection:start(#amqp_params_network{}),
86 {ok, Chan} = amqp_connection:open_channel(Conn),
87 process_flag(trap_exit, true),
88 Cmd = #'exchange.declare'{
89 exchange = <<"fail">>,
90 type = <<"x-consistent-hash">>,
91 arguments = [{<<"hash-header">>, longstr, <<"foo">>},
92 {<<"hash-property">>, longstr, <<"bar">>}]
94 ?assertExit(_, amqp_channel:call(Chan, Cmd)),
95 amqp_connection:close(Conn),
98 test_non_supported_property() ->
99 {ok, Conn} = amqp_connection:start(#amqp_params_network{}),
100 {ok, Chan} = amqp_connection:open_channel(Conn),
101 process_flag(trap_exit, true),
102 Cmd = #'exchange.declare'{
103 exchange = <<"fail">>,
104 type = <<"x-consistent-hash">>,
105 arguments = [{<<"hash-property">>, longstr, <<"app_id">>}]
107 ?assertExit(_, amqp_channel:call(Chan, Cmd)),
108 amqp_connection:close(Conn),
112 list_to_binary(integer_to_list(rndint())).
115 random:uniform(1000000).
117 test0(MakeMethod, MakeMsg, DeclareArgs, [Q1, Q2, Q3, Q4] = Queues) ->
120 {ok, Conn} = amqp_connection:start(#amqp_params_network{}),
121 {ok, Chan} = amqp_connection:open_channel(Conn),
122 #'exchange.declare_ok'{} =
123 amqp_channel:call(Chan,
124 #'exchange.declare' {
126 type = <<"x-consistent-hash">>,
128 arguments = DeclareArgs
130 [#'queue.declare_ok'{} =
131 amqp_channel:call(Chan, #'queue.declare' {
132 queue = Q, exclusive = true }) || Q <- Queues],
133 [#'queue.bind_ok'{} =
134 amqp_channel:call(Chan, #'queue.bind' {queue = Q,
136 routing_key = <<"10">>})
138 [#'queue.bind_ok'{} =
139 amqp_channel:call(Chan, #'queue.bind' {queue = Q,
141 routing_key = <<"20">>})
143 #'tx.select_ok'{} = amqp_channel:call(Chan, #'tx.select'{}),
144 [amqp_channel:call(Chan,
146 MakeMsg()) || _ <- lists:duplicate(Count, const)],
147 amqp_channel:call(Chan, #'tx.commit'{}),
150 #'queue.declare_ok'{message_count = M} =
151 amqp_channel:call(Chan, #'queue.declare' {queue = Q,
155 Count = lists:sum(Counts), %% All messages got routed
156 [true = C > 0.01 * Count || C <- Counts], %% We are not *grossly* unfair
157 amqp_channel:call(Chan, #'exchange.delete' {exchange = <<"e">>}),
158 [amqp_channel:call(Chan, #'queue.delete' {queue = Q}) || Q <- Queues],
159 amqp_channel:close(Chan),
160 amqp_connection:close(Conn),
163 test_binding_with_negative_routing_key() ->
164 {ok, Conn} = amqp_connection:start(#amqp_params_network{}),
165 {ok, Chan} = amqp_connection:open_channel(Conn),
166 Declare1 = #'exchange.declare'{exchange = <<"bind-fail">>,
167 type = <<"x-consistent-hash">>},
168 #'exchange.declare_ok'{} = amqp_channel:call(Chan, Declare1),
169 Q = <<"test-queue">>,
170 Declare2 = #'queue.declare'{queue = Q},
171 #'queue.declare_ok'{} = amqp_channel:call(Chan, Declare2),
172 process_flag(trap_exit, true),
173 Cmd = #'queue.bind'{exchange = <<"bind-fail">>,
174 routing_key = <<"-1">>},
175 ?assertExit(_, amqp_channel:call(Chan, Cmd)),
176 {ok, Ch2} = amqp_connection:open_channel(Conn),
177 amqp_channel:call(Ch2, #'queue.delete'{queue = Q}),
178 amqp_connection:close(Conn),
181 test_binding_with_non_numeric_routing_key() ->
182 {ok, Conn} = amqp_connection:start(#amqp_params_network{}),
183 {ok, Chan} = amqp_connection:open_channel(Conn),
184 Declare1 = #'exchange.declare'{exchange = <<"bind-fail">>,
185 type = <<"x-consistent-hash">>},
186 #'exchange.declare_ok'{} = amqp_channel:call(Chan, Declare1),
187 Q = <<"test-queue">>,
188 Declare2 = #'queue.declare'{queue = Q},
189 #'queue.declare_ok'{} = amqp_channel:call(Chan, Declare2),
190 process_flag(trap_exit, true),
191 Cmd = #'queue.bind'{exchange = <<"bind-fail">>,
192 routing_key = <<"not-a-number">>},
193 ?assertExit(_, amqp_channel:call(Chan, Cmd)),
194 {ok, Ch2} = amqp_connection:open_channel(Conn),
195 amqp_channel:call(Ch2, #'queue.delete'{queue = Q}),
196 amqp_connection:close(Conn),