]> review.fuel-infra Code Review - packages/trusty/rabbitmq-server.git/blob
102ecfdbcb08a811108a853af47b65c9016cbde4
[packages/trusty/rabbitmq-server.git] /
1 -module(rabbit_exchange_type_recent_history_test).
2
3 -export([test/0]).
4
5 -include_lib("eunit/include/eunit.hrl").
6 -include_lib("amqp_client/include/amqp_client.hrl").
7 -include("rabbit_recent_history.hrl").
8
9 -define(RABBIT, {"rabbit", 5672}).
10 -define(HARE,   {"hare", 5673}).
11
12 -import(rabbit_exchange_type_recent_history_test_util,
13         [start_other_node/1, cluster_other_node/2,
14          stop_other_node/1]).
15
16 test() ->
17     ok = eunit:test(tests(?MODULE, 60), [verbose]).
18
19 default_length_test() ->
20     Qs = qs(),
21     test0(fun () ->
22                   #'basic.publish'{exchange = <<"e">>}
23           end,
24           fun() ->
25                   #amqp_msg{props = #'P_basic'{}, payload = <<>>}
26           end, [], Qs, 100, length(Qs) * ?KEEP_NB).
27
28 length_argument_test() ->
29     Qs = qs(),
30     test0(fun () ->
31                   #'basic.publish'{exchange = <<"e">>}
32           end,
33           fun() ->
34                   #amqp_msg{props = #'P_basic'{}, payload = <<>>}
35           end, [{<<"x-recent-history-length">>, long, 30}], Qs, 100, length(Qs) * 30).
36
37 wrong_argument_type_test() ->
38     wrong_argument_type_test0(-30),
39     wrong_argument_type_test0(0).
40
41
42 no_store_test() ->
43     Qs = qs(),
44     test0(fun () ->
45                   #'basic.publish'{exchange = <<"e">>}
46           end,
47           fun() ->
48                   H = [{<<"x-recent-history-no-store">>, bool, true}],
49                   #amqp_msg{props = #'P_basic'{headers = H}, payload = <<>>}
50           end, [], Qs, 100, 0).
51
52 e2e_test() ->
53     MsgCount = 10,
54     {ok, Conn} = amqp_connection:start(#amqp_params_network{}),
55     {ok, Chan} = amqp_connection:open_channel(Conn),
56
57     #'exchange.declare_ok'{} =
58         amqp_channel:call(Chan,
59                           #'exchange.declare' {
60                             exchange = <<"e1">>,
61                             type = <<"x-recent-history">>,
62                             auto_delete = true
63                            }),
64
65     #'exchange.declare_ok'{} =
66         amqp_channel:call(Chan,
67                           #'exchange.declare' {
68                             exchange = <<"e2">>,
69                             type = <<"direct">>,
70                             auto_delete = true
71                            }),
72
73     #'queue.declare_ok'{queue = Q} =
74         amqp_channel:call(Chan, #'queue.declare' {
75                                    queue     = <<"q">>
76                                   }),
77
78     #'queue.bind_ok'{} =
79         amqp_channel:call(Chan, #'queue.bind' {
80                                    queue = Q,
81                                    exchange = <<"e2">>,
82                                    routing_key = <<"">>
83                                   }),
84
85     #'tx.select_ok'{} = amqp_channel:call(Chan, #'tx.select'{}),
86     [amqp_channel:call(Chan,
87                        #'basic.publish'{exchange = <<"e1">>},
88                        #amqp_msg{props = #'P_basic'{}, payload = <<>>}) ||
89         _ <- lists:duplicate(MsgCount, const)],
90     amqp_channel:call(Chan, #'tx.commit'{}),
91
92     amqp_channel:call(Chan,
93                       #'exchange.bind' {
94                          source      = <<"e1">>,
95                          destination = <<"e2">>,
96                          routing_key = <<"">>
97                         }),
98
99     #'queue.declare_ok'{message_count = Count, queue = Q} =
100         amqp_channel:call(Chan, #'queue.declare' {
101                                    passive   = true,
102                                    queue     = Q
103                                   }),
104
105     ?assertEqual(MsgCount, Count),
106
107     amqp_channel:call(Chan, #'exchange.delete' { exchange = <<"e1">> }),
108     amqp_channel:call(Chan, #'exchange.delete' { exchange = <<"e2">> }),
109     amqp_channel:call(Chan, #'queue.delete' { queue = Q }),
110     amqp_channel:close(Chan),
111     amqp_connection:close(Conn),
112     ok.
113
114 multinode_test() ->
115     start_other_node(?HARE),
116     cluster_other_node(?HARE, ?RABBIT),
117
118     {ok, Conn} = amqp_connection:start(#amqp_params_network{port=5673}),
119     {ok, Chan} = amqp_connection:open_channel(Conn),
120
121     #'exchange.declare_ok'{} =
122         amqp_channel:call(Chan,
123                           #'exchange.declare' {
124                             exchange = <<"e1">>,
125                             type = <<"x-recent-history">>,
126                             auto_delete = false
127                            }),
128
129     #'queue.declare_ok'{queue = Q} =
130         amqp_channel:call(Chan, #'queue.declare' {
131                                    queue     = <<"q">>
132                                   }),
133
134     #'queue.bind_ok'{} =
135         amqp_channel:call(Chan, #'queue.bind' {
136                                    queue = Q,
137                                    exchange = <<"e1">>,
138                                    routing_key = <<"">>
139                                   }),
140
141     amqp_channel:call(Chan, #'queue.delete' { queue = Q }),
142     amqp_channel:close(Chan),
143     amqp_connection:close(Conn),
144     stop_other_node(?HARE),
145
146     {ok, Conn2} = amqp_connection:start(#amqp_params_network{}),
147     {ok, Chan2} = amqp_connection:open_channel(Conn2),
148
149     #'queue.declare_ok'{queue = Q2} =
150         amqp_channel:call(Chan2, #'queue.declare' {
151                                    queue     = <<"q2">>
152                                   }),
153
154     #'queue.bind_ok'{} =
155         amqp_channel:call(Chan2, #'queue.bind' {
156                                    queue = Q2,
157                                    exchange = <<"e1">>,
158                                    routing_key = <<"">>
159                                   }),
160
161     amqp_channel:call(Chan2, #'exchange.delete' { exchange = <<"e2">> }),
162     amqp_channel:call(Chan2, #'queue.delete' { queue = Q2 }),
163     amqp_channel:close(Chan2),
164     amqp_connection:close(Conn2),
165     ok.
166
167 test0(MakeMethod, MakeMsg, DeclareArgs, Queues, MsgCount, ExpectedCount) ->
168     {ok, Conn} = amqp_connection:start(#amqp_params_network{}),
169     {ok, Chan} = amqp_connection:open_channel(Conn),
170     #'exchange.declare_ok'{} =
171         amqp_channel:call(Chan,
172                           #'exchange.declare' {
173                             exchange = <<"e">>,
174                             type = <<"x-recent-history">>,
175                             auto_delete = true,
176                             arguments = DeclareArgs
177                            }),
178
179     #'tx.select_ok'{} = amqp_channel:call(Chan, #'tx.select'{}),
180     [amqp_channel:call(Chan,
181                        MakeMethod(),
182                        MakeMsg()) || _ <- lists:duplicate(MsgCount, const)],
183     amqp_channel:call(Chan, #'tx.commit'{}),
184
185     [#'queue.declare_ok'{} =
186          amqp_channel:call(Chan, #'queue.declare' {
187                              queue = Q, exclusive = true }) || Q <- Queues],
188
189     [#'queue.bind_ok'{} =
190          amqp_channel:call(Chan, #'queue.bind' { queue = Q,
191                                                  exchange = <<"e">>,
192                                                  routing_key = <<"">>})
193      || Q <- Queues],
194
195     Counts =
196         [begin
197             #'queue.declare_ok'{message_count = M} =
198                  amqp_channel:call(Chan, #'queue.declare' {queue     = Q,
199                                                            exclusive = true }),
200              M
201          end || Q <- Queues],
202
203
204     ?assertEqual(ExpectedCount, lists:sum(Counts)),
205
206     amqp_channel:call(Chan, #'exchange.delete' { exchange = <<"e">> }),
207     [amqp_channel:call(Chan, #'queue.delete' { queue = Q }) || Q <- Queues],
208     amqp_channel:close(Chan),
209     amqp_connection:close(Conn),
210     ok.
211
212 wrong_argument_type_test0(Length) ->
213     {ok, Conn} = amqp_connection:start(#amqp_params_network{}),
214     {ok, Chan} = amqp_connection:open_channel(Conn),
215     DeclareArgs = [{<<"x-recent-history-length">>, long, Length}],
216     process_flag(trap_exit, true),
217     ?assertExit(_, amqp_channel:call(Chan,
218                           #'exchange.declare' {
219                             exchange = <<"e">>,
220                             type = <<"x-recent-history">>,
221                             auto_delete = true,
222                             arguments = DeclareArgs
223                             })),
224     amqp_connection:close(Conn),
225     ok.
226
227 qs() ->
228     [<<"q0">>, <<"q1">>, <<"q2">>, <<"q3">>].
229
230 tests(Module, Timeout) ->
231     {foreach, fun() -> ok end,
232      [{timeout, Timeout, fun () -> Module:F() end} ||
233          {F, _Arity} <- proplists:get_value(exports, Module:module_info()),
234          string:right(atom_to_list(F), 5) =:= "_test"]}.