1 -module(rabbit_exchange_type_recent_history_test).
5 -include_lib("eunit/include/eunit.hrl").
6 -include_lib("amqp_client/include/amqp_client.hrl").
7 -include("rabbit_recent_history.hrl").
9 -define(RABBIT, {"rabbit", 5672}).
10 -define(HARE, {"hare", 5673}).
12 -import(rabbit_exchange_type_recent_history_test_util,
13 [start_other_node/1, cluster_other_node/2,
17 ok = eunit:test(tests(?MODULE, 60), [verbose]).
19 default_length_test() ->
22 #'basic.publish'{exchange = <<"e">>}
25 #amqp_msg{props = #'P_basic'{}, payload = <<>>}
26 end, [], Qs, 100, length(Qs) * ?KEEP_NB).
28 length_argument_test() ->
31 #'basic.publish'{exchange = <<"e">>}
34 #amqp_msg{props = #'P_basic'{}, payload = <<>>}
35 end, [{<<"x-recent-history-length">>, long, 30}], Qs, 100, length(Qs) * 30).
37 wrong_argument_type_test() ->
38 wrong_argument_type_test0(-30),
39 wrong_argument_type_test0(0).
45 #'basic.publish'{exchange = <<"e">>}
48 H = [{<<"x-recent-history-no-store">>, bool, true}],
49 #amqp_msg{props = #'P_basic'{headers = H}, payload = <<>>}
54 {ok, Conn} = amqp_connection:start(#amqp_params_network{}),
55 {ok, Chan} = amqp_connection:open_channel(Conn),
57 #'exchange.declare_ok'{} =
58 amqp_channel:call(Chan,
61 type = <<"x-recent-history">>,
65 #'exchange.declare_ok'{} =
66 amqp_channel:call(Chan,
73 #'queue.declare_ok'{queue = Q} =
74 amqp_channel:call(Chan, #'queue.declare' {
79 amqp_channel:call(Chan, #'queue.bind' {
85 #'tx.select_ok'{} = amqp_channel:call(Chan, #'tx.select'{}),
86 [amqp_channel:call(Chan,
87 #'basic.publish'{exchange = <<"e1">>},
88 #amqp_msg{props = #'P_basic'{}, payload = <<>>}) ||
89 _ <- lists:duplicate(MsgCount, const)],
90 amqp_channel:call(Chan, #'tx.commit'{}),
92 amqp_channel:call(Chan,
95 destination = <<"e2">>,
99 #'queue.declare_ok'{message_count = Count, queue = Q} =
100 amqp_channel:call(Chan, #'queue.declare' {
105 ?assertEqual(MsgCount, Count),
107 amqp_channel:call(Chan, #'exchange.delete' { exchange = <<"e1">> }),
108 amqp_channel:call(Chan, #'exchange.delete' { exchange = <<"e2">> }),
109 amqp_channel:call(Chan, #'queue.delete' { queue = Q }),
110 amqp_channel:close(Chan),
111 amqp_connection:close(Conn),
115 start_other_node(?HARE),
116 cluster_other_node(?HARE, ?RABBIT),
118 {ok, Conn} = amqp_connection:start(#amqp_params_network{port=5673}),
119 {ok, Chan} = amqp_connection:open_channel(Conn),
121 #'exchange.declare_ok'{} =
122 amqp_channel:call(Chan,
123 #'exchange.declare' {
125 type = <<"x-recent-history">>,
129 #'queue.declare_ok'{queue = Q} =
130 amqp_channel:call(Chan, #'queue.declare' {
135 amqp_channel:call(Chan, #'queue.bind' {
141 amqp_channel:call(Chan, #'queue.delete' { queue = Q }),
142 amqp_channel:close(Chan),
143 amqp_connection:close(Conn),
144 stop_other_node(?HARE),
146 {ok, Conn2} = amqp_connection:start(#amqp_params_network{}),
147 {ok, Chan2} = amqp_connection:open_channel(Conn2),
149 #'queue.declare_ok'{queue = Q2} =
150 amqp_channel:call(Chan2, #'queue.declare' {
155 amqp_channel:call(Chan2, #'queue.bind' {
161 amqp_channel:call(Chan2, #'exchange.delete' { exchange = <<"e2">> }),
162 amqp_channel:call(Chan2, #'queue.delete' { queue = Q2 }),
163 amqp_channel:close(Chan2),
164 amqp_connection:close(Conn2),
167 test0(MakeMethod, MakeMsg, DeclareArgs, Queues, MsgCount, ExpectedCount) ->
168 {ok, Conn} = amqp_connection:start(#amqp_params_network{}),
169 {ok, Chan} = amqp_connection:open_channel(Conn),
170 #'exchange.declare_ok'{} =
171 amqp_channel:call(Chan,
172 #'exchange.declare' {
174 type = <<"x-recent-history">>,
176 arguments = DeclareArgs
179 #'tx.select_ok'{} = amqp_channel:call(Chan, #'tx.select'{}),
180 [amqp_channel:call(Chan,
182 MakeMsg()) || _ <- lists:duplicate(MsgCount, const)],
183 amqp_channel:call(Chan, #'tx.commit'{}),
185 [#'queue.declare_ok'{} =
186 amqp_channel:call(Chan, #'queue.declare' {
187 queue = Q, exclusive = true }) || Q <- Queues],
189 [#'queue.bind_ok'{} =
190 amqp_channel:call(Chan, #'queue.bind' { queue = Q,
192 routing_key = <<"">>})
197 #'queue.declare_ok'{message_count = M} =
198 amqp_channel:call(Chan, #'queue.declare' {queue = Q,
204 ?assertEqual(ExpectedCount, lists:sum(Counts)),
206 amqp_channel:call(Chan, #'exchange.delete' { exchange = <<"e">> }),
207 [amqp_channel:call(Chan, #'queue.delete' { queue = Q }) || Q <- Queues],
208 amqp_channel:close(Chan),
209 amqp_connection:close(Conn),
212 wrong_argument_type_test0(Length) ->
213 {ok, Conn} = amqp_connection:start(#amqp_params_network{}),
214 {ok, Chan} = amqp_connection:open_channel(Conn),
215 DeclareArgs = [{<<"x-recent-history-length">>, long, Length}],
216 process_flag(trap_exit, true),
217 ?assertExit(_, amqp_channel:call(Chan,
218 #'exchange.declare' {
220 type = <<"x-recent-history">>,
222 arguments = DeclareArgs
224 amqp_connection:close(Conn),
228 [<<"q0">>, <<"q1">>, <<"q2">>, <<"q3">>].
230 tests(Module, Timeout) ->
231 {foreach, fun() -> ok end,
232 [{timeout, Timeout, fun () -> Module:F() end} ||
233 {F, _Arity} <- proplists:get_value(exports, Module:module_info()),
234 string:right(atom_to_list(F), 5) =:= "_test"]}.