]> review.fuel-infra Code Review - packages/trusty/rabbitmq-server.git/blob
be068b79f930bfaa86c81958ea28ff9833b34a8a
[packages/trusty/rabbitmq-server.git] /
1 %% The contents of this file are subject to the Mozilla Public License
2 %% Version 1.1 (the "License"); you may not use this file except in
3 %% compliance with the License. You may obtain a copy of the License
4 %% at http://www.mozilla.org/MPL/
5 %%
6 %% Software distributed under the License is distributed on an "AS IS"
7 %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8 %% the License for the specific language governing rights and
9 %% limitations under the License.
10 %%
11 %% The Original Code is RabbitMQ Consistent Hash Exchange.
12 %%
13 %% The Initial Developer of the Original Code is GoPivotal, Inc.
14 %% Copyright (c) 2007-2016 Pivotal Software, Inc.  All rights reserved.
15 %%
16
17 -module(rabbit_exchange_type_consistent_hash_test).
18 -export([test/0]).
19 -include_lib("amqp_client/include/amqp_client.hrl").
20 -include_lib("eunit/include/eunit.hrl").
21
22 %% Because the routing is probabilistic, we can't really test a great
23 %% deal here.
24
25 test() ->
26     %% Run the test twice to test we clean up correctly
27     t([<<"q0">>, <<"q1">>, <<"q2">>, <<"q3">>]),
28     t([<<"q4">>, <<"q5">>, <<"q6">>, <<"q7">>]).
29
30 t(Qs) ->
31     ok = test_with_rk(Qs),
32     ok = test_with_header(Qs),
33     ok = test_binding_with_negative_routing_key(),
34     ok = test_binding_with_non_numeric_routing_key(),
35     ok = test_with_correlation_id(Qs),
36     ok = test_with_message_id(Qs),
37     ok = test_with_timestamp(Qs),
38     ok = test_non_supported_property(),
39     ok = test_mutually_exclusive_arguments(),
40     ok.
41
42 test_with_rk(Qs) ->
43     test0(fun () ->
44                   #'basic.publish'{exchange = <<"e">>, routing_key = rnd()}
45           end,
46           fun() ->
47                   #amqp_msg{props = #'P_basic'{}, payload = <<>>}
48           end, [], Qs).
49
50 test_with_header(Qs) ->
51     test0(fun () ->
52                   #'basic.publish'{exchange = <<"e">>}
53           end,
54           fun() ->
55                   H = [{<<"hashme">>, longstr, rnd()}],
56                   #amqp_msg{props = #'P_basic'{headers = H}, payload = <<>>}
57           end, [{<<"hash-header">>, longstr, <<"hashme">>}], Qs).
58
59
60 test_with_correlation_id(Qs) ->
61     test0(fun() ->
62                   #'basic.publish'{exchange = <<"e">>}
63           end,
64           fun() ->
65                   #amqp_msg{props = #'P_basic'{correlation_id = rnd()}, payload = <<>>}
66           end, [{<<"hash-property">>, longstr, <<"correlation_id">>}], Qs).
67
68 test_with_message_id(Qs) ->
69     test0(fun() ->
70                   #'basic.publish'{exchange = <<"e">>}
71           end,
72           fun() ->
73                   #amqp_msg{props = #'P_basic'{message_id = rnd()}, payload = <<>>}
74           end, [{<<"hash-property">>, longstr, <<"message_id">>}], Qs).
75
76 test_with_timestamp(Qs) ->
77     test0(fun() ->
78                   #'basic.publish'{exchange = <<"e">>}
79           end,
80           fun() ->
81                   #amqp_msg{props = #'P_basic'{timestamp = rndint()}, payload = <<>>}
82           end, [{<<"hash-property">>, longstr, <<"timestamp">>}], Qs).
83
84 test_mutually_exclusive_arguments() ->
85     {ok, Conn} = amqp_connection:start(#amqp_params_network{}),
86     {ok, Chan} = amqp_connection:open_channel(Conn),
87     process_flag(trap_exit, true),
88     Cmd = #'exchange.declare'{
89              exchange  = <<"fail">>,
90              type      = <<"x-consistent-hash">>,
91              arguments = [{<<"hash-header">>, longstr, <<"foo">>},
92                           {<<"hash-property">>, longstr, <<"bar">>}]
93             },
94     ?assertExit(_, amqp_channel:call(Chan, Cmd)),
95     amqp_connection:close(Conn),
96     ok.
97
98 test_non_supported_property() ->
99     {ok, Conn} = amqp_connection:start(#amqp_params_network{}),
100     {ok, Chan} = amqp_connection:open_channel(Conn),
101     process_flag(trap_exit, true),
102     Cmd = #'exchange.declare'{
103              exchange  = <<"fail">>,
104              type      = <<"x-consistent-hash">>,
105              arguments = [{<<"hash-property">>, longstr, <<"app_id">>}]
106             },
107     ?assertExit(_, amqp_channel:call(Chan, Cmd)),
108     amqp_connection:close(Conn),
109     ok.
110
111 rnd() ->
112     list_to_binary(integer_to_list(rndint())).
113
114 rndint() ->
115     random:uniform(1000000).
116
117 test0(MakeMethod, MakeMsg, DeclareArgs, [Q1, Q2, Q3, Q4] = Queues) ->
118     Count = 10000,
119
120     {ok, Conn} = amqp_connection:start(#amqp_params_network{}),
121     {ok, Chan} = amqp_connection:open_channel(Conn),
122     #'exchange.declare_ok'{} =
123         amqp_channel:call(Chan,
124                           #'exchange.declare' {
125                             exchange = <<"e">>,
126                             type = <<"x-consistent-hash">>,
127                             auto_delete = true,
128                             arguments = DeclareArgs
129                           }),
130     [#'queue.declare_ok'{} =
131          amqp_channel:call(Chan, #'queue.declare' {
132                              queue = Q, exclusive = true }) || Q <- Queues],
133     [#'queue.bind_ok'{} =
134          amqp_channel:call(Chan, #'queue.bind' {queue = Q,
135                                                 exchange = <<"e">>,
136                                                 routing_key = <<"10">>})
137      || Q <- [Q1, Q2]],
138     [#'queue.bind_ok'{} =
139          amqp_channel:call(Chan, #'queue.bind' {queue = Q,
140                                                 exchange = <<"e">>,
141                                                 routing_key = <<"20">>})
142      || Q <- [Q3, Q4]],
143     #'tx.select_ok'{} = amqp_channel:call(Chan, #'tx.select'{}),
144     [amqp_channel:call(Chan,
145                        MakeMethod(),
146                        MakeMsg()) || _ <- lists:duplicate(Count, const)],
147     amqp_channel:call(Chan, #'tx.commit'{}),
148     Counts =
149         [begin
150              #'queue.declare_ok'{message_count = M} =
151                  amqp_channel:call(Chan, #'queue.declare' {queue     = Q,
152                                                            exclusive = true}),
153              M
154          end || Q <- Queues],
155     Count = lists:sum(Counts), %% All messages got routed
156     [true = C > 0.01 * Count || C <- Counts], %% We are not *grossly* unfair
157     amqp_channel:call(Chan, #'exchange.delete' {exchange = <<"e">>}),
158     [amqp_channel:call(Chan, #'queue.delete' {queue = Q}) || Q <- Queues],
159     amqp_channel:close(Chan),
160     amqp_connection:close(Conn),
161     ok.
162
163 test_binding_with_negative_routing_key() ->
164     {ok, Conn} = amqp_connection:start(#amqp_params_network{}),
165     {ok, Chan} = amqp_connection:open_channel(Conn),
166     Declare1 = #'exchange.declare'{exchange = <<"bind-fail">>,
167                                    type = <<"x-consistent-hash">>},
168     #'exchange.declare_ok'{} = amqp_channel:call(Chan, Declare1),
169     Q = <<"test-queue">>,
170     Declare2 = #'queue.declare'{queue = Q},
171     #'queue.declare_ok'{} = amqp_channel:call(Chan, Declare2),
172     process_flag(trap_exit, true),
173     Cmd = #'queue.bind'{exchange = <<"bind-fail">>,
174                         routing_key = <<"-1">>},
175     ?assertExit(_, amqp_channel:call(Chan, Cmd)),
176     {ok, Ch2} = amqp_connection:open_channel(Conn),
177     amqp_channel:call(Ch2, #'queue.delete'{queue = Q}),
178     amqp_connection:close(Conn),
179     ok.
180
181 test_binding_with_non_numeric_routing_key() ->
182     {ok, Conn} = amqp_connection:start(#amqp_params_network{}),
183     {ok, Chan} = amqp_connection:open_channel(Conn),
184     Declare1 = #'exchange.declare'{exchange = <<"bind-fail">>,
185                                    type = <<"x-consistent-hash">>},
186     #'exchange.declare_ok'{} = amqp_channel:call(Chan, Declare1),
187     Q = <<"test-queue">>,
188     Declare2 = #'queue.declare'{queue = Q},
189     #'queue.declare_ok'{} = amqp_channel:call(Chan, Declare2),
190     process_flag(trap_exit, true),
191     Cmd = #'queue.bind'{exchange = <<"bind-fail">>,
192                         routing_key = <<"not-a-number">>},
193     ?assertExit(_, amqp_channel:call(Chan, Cmd)),
194     {ok, Ch2} = amqp_connection:open_channel(Conn),
195     amqp_channel:call(Ch2, #'queue.delete'{queue = Q}),
196     amqp_connection:close(Conn),
197     ok.