1 %% The contents of this file are subject to the Mozilla Public License
2 %% Version 1.1 (the "License"); you may not use this file except in
3 %% compliance with the License. You may obtain a copy of the License
4 %% at http://www.mozilla.org/MPL/
6 %% Software distributed under the License is distributed on an "AS IS"
7 %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8 %% the License for the specific language governing rights and
9 %% limitations under the License.
11 %% The Original Code is RabbitMQ Consistent Hash Exchange.
13 %% The Initial Developer of the Original Code is GoPivotal, Inc.
14 %% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
17 -module(rabbit_exchange_type_consistent_hash_SUITE).
21 -include_lib("common_test/include/ct.hrl").
22 -include_lib("amqp_client/include/amqp_client.hrl").
23 -include_lib("eunit/include/eunit.hrl").
27 {group, non_parallel_tests}
32 {non_parallel_tests, [], [
37 %% -------------------------------------------------------------------
38 %% Test suite setup/teardown
39 %% -------------------------------------------------------------------
41 init_per_suite(Config) ->
42 rabbit_ct_helpers:log_environment(),
43 Config1 = rabbit_ct_helpers:set_config(Config, [
44 {rmq_nodename_suffix, ?MODULE}
46 rabbit_ct_helpers:run_setup_steps(Config1,
47 rabbit_ct_broker_helpers:setup_steps() ++
48 rabbit_ct_client_helpers:setup_steps()).
50 end_per_suite(Config) ->
51 rabbit_ct_helpers:run_teardown_steps(Config,
52 rabbit_ct_client_helpers:teardown_steps() ++
53 rabbit_ct_broker_helpers:teardown_steps()).
55 init_per_group(_, Config) ->
58 end_per_group(_, Config) ->
61 init_per_testcase(Testcase, Config) ->
62 rabbit_ct_helpers:testcase_started(Config, Testcase).
64 end_per_testcase(Testcase, Config) ->
65 rabbit_ct_helpers:testcase_finished(Config, Testcase).
67 %% -------------------------------------------------------------------
69 %% -------------------------------------------------------------------
71 routing_test(Config) ->
72 %% Run the test twice to test we clean up correctly
73 routing_test0(Config, [<<"q0">>, <<"q1">>, <<"q2">>, <<"q3">>]),
74 routing_test0(Config, [<<"q4">>, <<"q5">>, <<"q6">>, <<"q7">>]),
78 routing_test0(Config, Qs) ->
79 ok = test_with_rk(Config, Qs),
80 ok = test_with_header(Config, Qs),
81 ok = test_binding_with_negative_routing_key(Config),
82 ok = test_binding_with_non_numeric_routing_key(Config),
83 ok = test_with_correlation_id(Config, Qs),
84 ok = test_with_message_id(Config, Qs),
85 ok = test_with_timestamp(Config, Qs),
86 ok = test_non_supported_property(Config),
87 ok = test_mutually_exclusive_arguments(Config),
90 %% -------------------------------------------------------------------
92 %% -------------------------------------------------------------------
94 test_with_rk(Config, Qs) ->
95 test0(Config, fun () ->
96 #'basic.publish'{exchange = <<"e">>, routing_key = rnd()}
99 #amqp_msg{props = #'P_basic'{}, payload = <<>>}
102 test_with_header(Config, Qs) ->
103 test0(Config, fun () ->
104 #'basic.publish'{exchange = <<"e">>}
107 H = [{<<"hashme">>, longstr, rnd()}],
108 #amqp_msg{props = #'P_basic'{headers = H}, payload = <<>>}
109 end, [{<<"hash-header">>, longstr, <<"hashme">>}], Qs).
112 test_with_correlation_id(Config, Qs) ->
113 test0(Config, fun() ->
114 #'basic.publish'{exchange = <<"e">>}
117 #amqp_msg{props = #'P_basic'{correlation_id = rnd()}, payload = <<>>}
118 end, [{<<"hash-property">>, longstr, <<"correlation_id">>}], Qs).
120 test_with_message_id(Config, Qs) ->
121 test0(Config, fun() ->
122 #'basic.publish'{exchange = <<"e">>}
125 #amqp_msg{props = #'P_basic'{message_id = rnd()}, payload = <<>>}
126 end, [{<<"hash-property">>, longstr, <<"message_id">>}], Qs).
128 test_with_timestamp(Config, Qs) ->
129 test0(Config, fun() ->
130 #'basic.publish'{exchange = <<"e">>}
133 #amqp_msg{props = #'P_basic'{timestamp = rndint()}, payload = <<>>}
134 end, [{<<"hash-property">>, longstr, <<"timestamp">>}], Qs).
136 test_mutually_exclusive_arguments(Config) ->
137 Chan = rabbit_ct_client_helpers:open_channel(Config, 0),
139 process_flag(trap_exit, true),
140 Cmd = #'exchange.declare'{
141 exchange = <<"fail">>,
142 type = <<"x-consistent-hash">>,
143 arguments = [{<<"hash-header">>, longstr, <<"foo">>},
144 {<<"hash-property">>, longstr, <<"bar">>}]
146 ?assertExit(_, amqp_channel:call(Chan, Cmd)),
148 rabbit_ct_client_helpers:close_channel(Chan),
151 test_non_supported_property(Config) ->
152 Chan = rabbit_ct_client_helpers:open_channel(Config, 0),
154 process_flag(trap_exit, true),
155 Cmd = #'exchange.declare'{
156 exchange = <<"fail">>,
157 type = <<"x-consistent-hash">>,
158 arguments = [{<<"hash-property">>, longstr, <<"app_id">>}]
160 ?assertExit(_, amqp_channel:call(Chan, Cmd)),
162 rabbit_ct_client_helpers:close_channel(Chan),
166 list_to_binary(integer_to_list(rndint())).
169 rand_compat:uniform(1000000).
171 test0(Config, MakeMethod, MakeMsg, DeclareArgs, [Q1, Q2, Q3, Q4] = Queues) ->
173 Chan = rabbit_ct_client_helpers:open_channel(Config, 0),
175 #'exchange.declare_ok'{} =
176 amqp_channel:call(Chan,
177 #'exchange.declare' {
179 type = <<"x-consistent-hash">>,
181 arguments = DeclareArgs
183 [#'queue.declare_ok'{} =
184 amqp_channel:call(Chan, #'queue.declare' {
185 queue = Q, exclusive = true }) || Q <- Queues],
186 [#'queue.bind_ok'{} =
187 amqp_channel:call(Chan, #'queue.bind' {queue = Q,
189 routing_key = <<"10">>})
191 [#'queue.bind_ok'{} =
192 amqp_channel:call(Chan, #'queue.bind' {queue = Q,
194 routing_key = <<"20">>})
196 #'tx.select_ok'{} = amqp_channel:call(Chan, #'tx.select'{}),
197 [amqp_channel:call(Chan,
199 MakeMsg()) || _ <- lists:duplicate(Count, const)],
200 amqp_channel:call(Chan, #'tx.commit'{}),
203 #'queue.declare_ok'{message_count = M} =
204 amqp_channel:call(Chan, #'queue.declare' {queue = Q,
208 Count = lists:sum(Counts), %% All messages got routed
209 [true = C > 0.01 * Count || C <- Counts], %% We are not *grossly* unfair
210 amqp_channel:call(Chan, #'exchange.delete' {exchange = <<"e">>}),
211 [amqp_channel:call(Chan, #'queue.delete' {queue = Q}) || Q <- Queues],
213 rabbit_ct_client_helpers:close_channel(Chan),
216 test_binding_with_negative_routing_key(Config) ->
217 Chan = rabbit_ct_client_helpers:open_channel(Config, 0),
219 Declare1 = #'exchange.declare'{exchange = <<"bind-fail">>,
220 type = <<"x-consistent-hash">>},
221 #'exchange.declare_ok'{} = amqp_channel:call(Chan, Declare1),
222 Q = <<"test-queue">>,
223 Declare2 = #'queue.declare'{queue = Q},
224 #'queue.declare_ok'{} = amqp_channel:call(Chan, Declare2),
225 process_flag(trap_exit, true),
226 Cmd = #'queue.bind'{exchange = <<"bind-fail">>,
227 routing_key = <<"-1">>},
228 ?assertExit(_, amqp_channel:call(Chan, Cmd)),
229 Ch2 = rabbit_ct_client_helpers:open_channel(Config, 0),
230 amqp_channel:call(Ch2, #'queue.delete'{queue = Q}),
232 rabbit_ct_client_helpers:close_channel(Chan),
233 rabbit_ct_client_helpers:close_channel(Ch2),
236 test_binding_with_non_numeric_routing_key(Config) ->
237 Chan = rabbit_ct_client_helpers:open_channel(Config, 0),
239 Declare1 = #'exchange.declare'{exchange = <<"bind-fail">>,
240 type = <<"x-consistent-hash">>},
241 #'exchange.declare_ok'{} = amqp_channel:call(Chan, Declare1),
242 Q = <<"test-queue">>,
243 Declare2 = #'queue.declare'{queue = Q},
244 #'queue.declare_ok'{} = amqp_channel:call(Chan, Declare2),
245 process_flag(trap_exit, true),
246 Cmd = #'queue.bind'{exchange = <<"bind-fail">>,
247 routing_key = <<"not-a-number">>},
248 ?assertExit(_, amqp_channel:call(Chan, Cmd)),
250 Ch2 = rabbit_ct_client_helpers:open_channel(Config, 0),
251 amqp_channel:call(Ch2, #'queue.delete'{queue = Q}),
253 rabbit_ct_client_helpers:close_channel(Chan),