]> review.fuel-infra Code Review - packages/trusty/rabbitmq-server.git/blob
702fdb46c1c319584c2af5388d2ab6c8dd44b10e
[packages/trusty/rabbitmq-server.git] /
1 %% The contents of this file are subject to the Mozilla Public License
2 %% Version 1.1 (the "License"); you may not use this file except in
3 %% compliance with the License. You may obtain a copy of the License
4 %% at http://www.mozilla.org/MPL/
5 %%
6 %% Software distributed under the License is distributed on an "AS IS"
7 %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8 %% the License for the specific language governing rights and
9 %% limitations under the License.
10 %%
11 %% The Original Code is RabbitMQ Consistent Hash Exchange.
12 %%
13 %% The Initial Developer of the Original Code is GoPivotal, Inc.
14 %% Copyright (c) 2011-2014 GoPivotal, Inc.  All rights reserved.
15 %%
16
17 -module(rabbit_exchange_type_consistent_hash_test).
18 -export([test/0]).
19 -include_lib("amqp_client/include/amqp_client.hrl").
20
21 %% Because the routing is probabilistic, we can't really test a great
22 %% deal here.
23
24 test() ->
25     %% Run the test twice to test we clean up correctly
26     t([<<"q0">>, <<"q1">>, <<"q2">>, <<"q3">>]),
27     t([<<"q4">>, <<"q5">>, <<"q6">>, <<"q7">>]).
28
29 t(Qs) ->
30     ok = test_with_rk(Qs),
31     ok = test_with_header(Qs),
32     ok.
33
34 test_with_rk(Qs) ->
35     test0(fun () ->
36                   #'basic.publish'{exchange = <<"e">>, routing_key = rnd()}
37           end,
38           fun() ->
39                   #amqp_msg{props = #'P_basic'{}, payload = <<>>}
40           end, [], Qs).
41
42 test_with_header(Qs) ->
43     test0(fun () ->
44                   #'basic.publish'{exchange = <<"e">>}
45           end,
46           fun() ->
47                   H = [{<<"hashme">>, longstr, rnd()}],
48                   #amqp_msg{props = #'P_basic'{headers = H}, payload = <<>>}
49           end, [{<<"hash-header">>, longstr, <<"hashme">>}], Qs).
50
51 rnd() ->
52     list_to_binary(integer_to_list(random:uniform(1000000))).
53
54 test0(MakeMethod, MakeMsg, DeclareArgs, [Q1, Q2, Q3, Q4] = Queues) ->
55     Count = 10000,
56
57     {ok, Conn} = amqp_connection:start(#amqp_params_network{}),
58     {ok, Chan} = amqp_connection:open_channel(Conn),
59     #'exchange.declare_ok'{} =
60         amqp_channel:call(Chan,
61                           #'exchange.declare' {
62                             exchange = <<"e">>,
63                             type = <<"x-consistent-hash">>,
64                             auto_delete = true,
65                             arguments = DeclareArgs
66                            }),
67     [#'queue.declare_ok'{} =
68          amqp_channel:call(Chan, #'queue.declare' {
69                              queue = Q, exclusive = true }) || Q <- Queues],
70     [#'queue.bind_ok'{} =
71          amqp_channel:call(Chan, #'queue.bind' { queue = Q,
72                                                  exchange = <<"e">>,
73                                                  routing_key = <<"10">> })
74      || Q <- [Q1, Q2]],
75     [#'queue.bind_ok'{} =
76          amqp_channel:call(Chan, #'queue.bind' { queue = Q,
77                                                  exchange = <<"e">>,
78                                                  routing_key = <<"20">> })
79      || Q <- [Q3, Q4]],
80     #'tx.select_ok'{} = amqp_channel:call(Chan, #'tx.select'{}),
81     [amqp_channel:call(Chan,
82                        MakeMethod(),
83                        MakeMsg()) || _ <- lists:duplicate(Count, const)],
84     amqp_channel:call(Chan, #'tx.commit'{}),
85     Counts =
86         [begin
87             #'queue.declare_ok'{message_count = M} =
88                  amqp_channel:call(Chan, #'queue.declare' {queue     = Q,
89                                                            exclusive = true }),
90              M
91          end || Q <- Queues],
92     Count = lists:sum(Counts), %% All messages got routed
93     [true = C > 0.01 * Count || C <- Counts], %% We are not *grossly* unfair
94     amqp_channel:call(Chan, #'exchange.delete' { exchange = <<"e">> }),
95     [amqp_channel:call(Chan, #'queue.delete' { queue = Q }) || Q <- Queues],
96     amqp_channel:close(Chan),
97     amqp_connection:close(Conn),
98     ok.