]> review.fuel-infra Code Review - packages/trusty/rabbitmq-server.git/blob
d13b94a9688c3669f2203d8a92458ea49ee1fc9f
[packages/trusty/rabbitmq-server.git] /
1 %% The contents of this file are subject to the Mozilla Public License
2 %% Version 1.1 (the "License"); you may not use this file except in
3 %% compliance with the License. You may obtain a copy of the License
4 %% at http://www.mozilla.org/MPL/
5 %%
6 %% Software distributed under the License is distributed on an "AS IS"
7 %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8 %% the License for the specific language governing rights and
9 %% limitations under the License.
10 %%
11 %% The Original Code is RabbitMQ Consistent Hash Exchange.
12 %%
13 %% The Initial Developer of the Original Code is GoPivotal, Inc.
14 %% Copyright (c) 2007-2016 Pivotal Software, Inc.  All rights reserved.
15 %%
16
17 -module(rabbit_exchange_type_consistent_hash_SUITE).
18
19 -compile(export_all).
20
21 -include_lib("common_test/include/ct.hrl").
22 -include_lib("amqp_client/include/amqp_client.hrl").
23 -include_lib("eunit/include/eunit.hrl").
24
25 all() ->
26     [
27       {group, non_parallel_tests}
28     ].
29
30 groups() ->
31     [
32       {non_parallel_tests, [], [
33                                 routing_test
34                                ]}
35     ].
36
37 %% -------------------------------------------------------------------
38 %% Test suite setup/teardown
39 %% -------------------------------------------------------------------
40
41 init_per_suite(Config) ->
42     rabbit_ct_helpers:log_environment(),
43     Config1 = rabbit_ct_helpers:set_config(Config, [
44         {rmq_nodename_suffix, ?MODULE}
45       ]),
46     rabbit_ct_helpers:run_setup_steps(Config1,
47       rabbit_ct_broker_helpers:setup_steps() ++
48       rabbit_ct_client_helpers:setup_steps()).
49
50 end_per_suite(Config) ->
51     rabbit_ct_helpers:run_teardown_steps(Config,
52       rabbit_ct_client_helpers:teardown_steps() ++
53       rabbit_ct_broker_helpers:teardown_steps()).
54
55 init_per_group(_, Config) ->
56     Config.
57
58 end_per_group(_, Config) ->
59     Config.
60
61 init_per_testcase(Testcase, Config) ->
62     rabbit_ct_helpers:testcase_started(Config, Testcase).
63
64 end_per_testcase(Testcase, Config) ->
65     rabbit_ct_helpers:testcase_finished(Config, Testcase).
66
67 %% -------------------------------------------------------------------
68 %% Test cases
69 %% -------------------------------------------------------------------
70
71 routing_test(Config) ->
72     %% Run the test twice to test we clean up correctly
73     routing_test0(Config, [<<"q0">>, <<"q1">>, <<"q2">>, <<"q3">>]),
74     routing_test0(Config, [<<"q4">>, <<"q5">>, <<"q6">>, <<"q7">>]),
75
76     passed.
77
78 routing_test0(Config, Qs) ->
79     ok = test_with_rk(Config, Qs),
80     ok = test_with_header(Config, Qs),
81     ok = test_binding_with_negative_routing_key(Config),
82     ok = test_binding_with_non_numeric_routing_key(Config),
83     ok = test_with_correlation_id(Config, Qs),
84     ok = test_with_message_id(Config, Qs),
85     ok = test_with_timestamp(Config, Qs),
86     ok = test_non_supported_property(Config),
87     ok = test_mutually_exclusive_arguments(Config),
88     ok.
89
90 %% -------------------------------------------------------------------
91 %% Implementation
92 %% -------------------------------------------------------------------
93
94 test_with_rk(Config, Qs) ->
95     test0(Config, fun () ->
96                   #'basic.publish'{exchange = <<"e">>, routing_key = rnd()}
97           end,
98           fun() ->
99                   #amqp_msg{props = #'P_basic'{}, payload = <<>>}
100           end, [], Qs).
101
102 test_with_header(Config, Qs) ->
103     test0(Config, fun () ->
104                   #'basic.publish'{exchange = <<"e">>}
105           end,
106           fun() ->
107                   H = [{<<"hashme">>, longstr, rnd()}],
108                   #amqp_msg{props = #'P_basic'{headers = H}, payload = <<>>}
109           end, [{<<"hash-header">>, longstr, <<"hashme">>}], Qs).
110
111
112 test_with_correlation_id(Config, Qs) ->
113     test0(Config, fun() ->
114                   #'basic.publish'{exchange = <<"e">>}
115           end,
116           fun() ->
117                   #amqp_msg{props = #'P_basic'{correlation_id = rnd()}, payload = <<>>}
118           end, [{<<"hash-property">>, longstr, <<"correlation_id">>}], Qs).
119
120 test_with_message_id(Config, Qs) ->
121     test0(Config, fun() ->
122                   #'basic.publish'{exchange = <<"e">>}
123           end,
124           fun() ->
125                   #amqp_msg{props = #'P_basic'{message_id = rnd()}, payload = <<>>}
126           end, [{<<"hash-property">>, longstr, <<"message_id">>}], Qs).
127
128 test_with_timestamp(Config, Qs) ->
129     test0(Config, fun() ->
130                   #'basic.publish'{exchange = <<"e">>}
131           end,
132           fun() ->
133                   #amqp_msg{props = #'P_basic'{timestamp = rndint()}, payload = <<>>}
134           end, [{<<"hash-property">>, longstr, <<"timestamp">>}], Qs).
135
136 test_mutually_exclusive_arguments(Config) ->
137     Chan = rabbit_ct_client_helpers:open_channel(Config, 0),
138
139     process_flag(trap_exit, true),
140     Cmd = #'exchange.declare'{
141              exchange  = <<"fail">>,
142              type      = <<"x-consistent-hash">>,
143              arguments = [{<<"hash-header">>, longstr, <<"foo">>},
144                           {<<"hash-property">>, longstr, <<"bar">>}]
145             },
146     ?assertExit(_, amqp_channel:call(Chan, Cmd)),
147
148     rabbit_ct_client_helpers:close_channel(Chan),
149     ok.
150
151 test_non_supported_property(Config) ->
152     Chan = rabbit_ct_client_helpers:open_channel(Config, 0),
153
154     process_flag(trap_exit, true),
155     Cmd = #'exchange.declare'{
156              exchange  = <<"fail">>,
157              type      = <<"x-consistent-hash">>,
158              arguments = [{<<"hash-property">>, longstr, <<"app_id">>}]
159             },
160     ?assertExit(_, amqp_channel:call(Chan, Cmd)),
161
162     rabbit_ct_client_helpers:close_channel(Chan),
163     ok.
164
165 rnd() ->
166     list_to_binary(integer_to_list(rndint())).
167
168 rndint() ->
169     rand_compat:uniform(1000000).
170
171 test0(Config, MakeMethod, MakeMsg, DeclareArgs, [Q1, Q2, Q3, Q4] = Queues) ->
172     Count = 10000,
173     Chan = rabbit_ct_client_helpers:open_channel(Config, 0),
174
175     #'exchange.declare_ok'{} =
176         amqp_channel:call(Chan,
177                           #'exchange.declare' {
178                             exchange = <<"e">>,
179                             type = <<"x-consistent-hash">>,
180                             auto_delete = true,
181                             arguments = DeclareArgs
182                           }),
183     [#'queue.declare_ok'{} =
184          amqp_channel:call(Chan, #'queue.declare' {
185                              queue = Q, exclusive = true }) || Q <- Queues],
186     [#'queue.bind_ok'{} =
187          amqp_channel:call(Chan, #'queue.bind' {queue = Q,
188                                                 exchange = <<"e">>,
189                                                 routing_key = <<"10">>})
190      || Q <- [Q1, Q2]],
191     [#'queue.bind_ok'{} =
192          amqp_channel:call(Chan, #'queue.bind' {queue = Q,
193                                                 exchange = <<"e">>,
194                                                 routing_key = <<"20">>})
195      || Q <- [Q3, Q4]],
196     #'tx.select_ok'{} = amqp_channel:call(Chan, #'tx.select'{}),
197     [amqp_channel:call(Chan,
198                        MakeMethod(),
199                        MakeMsg()) || _ <- lists:duplicate(Count, const)],
200     amqp_channel:call(Chan, #'tx.commit'{}),
201     Counts =
202         [begin
203              #'queue.declare_ok'{message_count = M} =
204                  amqp_channel:call(Chan, #'queue.declare' {queue     = Q,
205                                                            exclusive = true}),
206              M
207          end || Q <- Queues],
208     Count = lists:sum(Counts), %% All messages got routed
209     [true = C > 0.01 * Count || C <- Counts], %% We are not *grossly* unfair
210     amqp_channel:call(Chan, #'exchange.delete' {exchange = <<"e">>}),
211     [amqp_channel:call(Chan, #'queue.delete' {queue = Q}) || Q <- Queues],
212
213     rabbit_ct_client_helpers:close_channel(Chan),
214     ok.
215
216 test_binding_with_negative_routing_key(Config) ->
217     Chan = rabbit_ct_client_helpers:open_channel(Config, 0),
218
219     Declare1 = #'exchange.declare'{exchange = <<"bind-fail">>,
220                                    type = <<"x-consistent-hash">>},
221     #'exchange.declare_ok'{} = amqp_channel:call(Chan, Declare1),
222     Q = <<"test-queue">>,
223     Declare2 = #'queue.declare'{queue = Q},
224     #'queue.declare_ok'{} = amqp_channel:call(Chan, Declare2),
225     process_flag(trap_exit, true),
226     Cmd = #'queue.bind'{exchange = <<"bind-fail">>,
227                         routing_key = <<"-1">>},
228     ?assertExit(_, amqp_channel:call(Chan, Cmd)),
229     Ch2 = rabbit_ct_client_helpers:open_channel(Config, 0),
230     amqp_channel:call(Ch2, #'queue.delete'{queue = Q}),
231
232     rabbit_ct_client_helpers:close_channel(Chan),
233     rabbit_ct_client_helpers:close_channel(Ch2),
234     ok.
235
236 test_binding_with_non_numeric_routing_key(Config) ->
237     Chan = rabbit_ct_client_helpers:open_channel(Config, 0),
238
239     Declare1 = #'exchange.declare'{exchange = <<"bind-fail">>,
240                                    type = <<"x-consistent-hash">>},
241     #'exchange.declare_ok'{} = amqp_channel:call(Chan, Declare1),
242     Q = <<"test-queue">>,
243     Declare2 = #'queue.declare'{queue = Q},
244     #'queue.declare_ok'{} = amqp_channel:call(Chan, Declare2),
245     process_flag(trap_exit, true),
246     Cmd = #'queue.bind'{exchange = <<"bind-fail">>,
247                         routing_key = <<"not-a-number">>},
248     ?assertExit(_, amqp_channel:call(Chan, Cmd)),
249
250     Ch2 = rabbit_ct_client_helpers:open_channel(Config, 0),
251     amqp_channel:call(Ch2, #'queue.delete'{queue = Q}),
252
253     rabbit_ct_client_helpers:close_channel(Chan),
254     ok.