1 %% The contents of this file are subject to the Mozilla Public License
2 %% Version 1.1 (the "License"); you may not use this file except in
3 %% compliance with the License. You may obtain a copy of the License
4 %% at http://www.mozilla.org/MPL/
6 %% Software distributed under the License is distributed on an "AS IS"
7 %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8 %% the License for the specific language governing rights and
9 %% limitations under the License.
11 %% The Original Code is RabbitMQ.
13 %% The Initial Developer of the Original Code is GoPivotal, Inc.
14 %% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
17 -module(rabbit_node_monitor).
19 -behaviour(gen_server).
21 -export([start_link/0]).
22 -export([running_nodes_filename/0,
23 cluster_status_filename/0, prepare_cluster_status_files/0,
24 write_cluster_status/1, read_cluster_status/0,
25 update_cluster_status/0, reset_cluster_status/0]).
26 -export([notify_node_up/0, notify_joined_cluster/0, notify_left_cluster/1]).
27 -export([partitions/0, partitions/1, status/1, subscribe/1]).
28 -export([pause_partition_guard/0]).
29 -export([global_sync/0]).
31 %% gen_server callbacks
32 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
36 -export([all_rabbit_nodes_up/0, run_outside_applications/2, ping_all/0,
37 alive_nodes/1, alive_rabbit_nodes/1]).
39 -define(SERVER, ?MODULE).
40 -define(RABBIT_UP_RPC_TIMEOUT, 2000).
41 -define(RABBIT_DOWN_PING_INTERVAL, 1000).
43 -record(state, {monitors, partitions, subscribers, down_ping_timer,
44 keepalive_timer, autoheal, guid, node_guids}).
46 %%----------------------------------------------------------------------------
48 -spec start_link() -> rabbit_types:ok_pid_or_error().
50 -spec running_nodes_filename() -> string().
51 -spec cluster_status_filename() -> string().
52 -spec prepare_cluster_status_files() -> 'ok'.
53 -spec write_cluster_status(rabbit_mnesia:cluster_status()) -> 'ok'.
54 -spec read_cluster_status() -> rabbit_mnesia:cluster_status().
55 -spec update_cluster_status() -> 'ok'.
56 -spec reset_cluster_status() -> 'ok'.
58 -spec notify_node_up() -> 'ok'.
59 -spec notify_joined_cluster() -> 'ok'.
60 -spec notify_left_cluster(node()) -> 'ok'.
62 -spec partitions() -> [node()].
63 -spec partitions([node()]) -> [{node(), [node()]}].
64 -spec status([node()]) -> {[{node(), [node()]}], [node()]}.
65 -spec subscribe(pid()) -> 'ok'.
66 -spec pause_partition_guard() -> 'ok' | 'pausing'.
68 -spec all_rabbit_nodes_up() -> boolean().
69 -spec run_outside_applications(fun (() -> any()), boolean()) -> pid().
70 -spec ping_all() -> 'ok'.
71 -spec alive_nodes([node()]) -> [node()].
72 -spec alive_rabbit_nodes([node()]) -> [node()].
74 %%----------------------------------------------------------------------------
76 %%----------------------------------------------------------------------------
78 start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
80 %%----------------------------------------------------------------------------
81 %% Cluster file operations
82 %%----------------------------------------------------------------------------
84 %% The cluster file information is kept in two files. The "cluster
85 %% status file" contains all the clustered nodes and the disc nodes.
86 %% The "running nodes file" contains the currently running nodes or
87 %% the running nodes at shutdown when the node is down.
89 %% We strive to keep the files up to date and we rely on this
90 %% assumption in various situations. Obviously when mnesia is offline
91 %% the information we have will be outdated, but it cannot be
94 running_nodes_filename() ->
95 filename:join(rabbit_mnesia:dir(), "nodes_running_at_shutdown").
97 cluster_status_filename() ->
98 rabbit_mnesia:dir() ++ "/cluster_nodes.config".
100 prepare_cluster_status_files() ->
101 rabbit_mnesia:ensure_mnesia_dir(),
102 Corrupt = fun(F) -> throw({error, corrupt_cluster_status_files, F}) end,
103 RunningNodes1 = case try_read_file(running_nodes_filename()) of
104 {ok, [Nodes]} when is_list(Nodes) -> Nodes;
105 {ok, Other} -> Corrupt(Other);
106 {error, enoent} -> []
109 %% The running nodes file might contain a set or a list, in case
110 %% of the legacy file
111 RunningNodes2 = lists:usort(ThisNode ++ RunningNodes1),
112 {AllNodes1, DiscNodes} =
113 case try_read_file(cluster_status_filename()) of
114 {ok, [{AllNodes, DiscNodes0}]} ->
115 {AllNodes, DiscNodes0};
116 {ok, [AllNodes0]} when is_list(AllNodes0) ->
117 {legacy_cluster_nodes(AllNodes0), legacy_disc_nodes(AllNodes0)};
121 LegacyNodes = legacy_cluster_nodes([]),
122 {LegacyNodes, LegacyNodes}
124 AllNodes2 = lists:usort(AllNodes1 ++ RunningNodes2),
125 ok = write_cluster_status({AllNodes2, DiscNodes, RunningNodes2}).
127 write_cluster_status({All, Disc, Running}) ->
128 ClusterStatusFN = cluster_status_filename(),
129 Res = case rabbit_file:write_term_file(ClusterStatusFN, [{All, Disc}]) of
131 RunningNodesFN = running_nodes_filename(),
133 rabbit_file:write_term_file(RunningNodesFN, [Running])};
135 {ClusterStatusFN, E1}
139 {FN, {error, E2}} -> throw({error, {could_not_write_file, FN, E2}})
142 read_cluster_status() ->
143 case {try_read_file(cluster_status_filename()),
144 try_read_file(running_nodes_filename())} of
145 {{ok, [{All, Disc}]}, {ok, [Running]}} when is_list(Running) ->
146 {All, Disc, Running};
148 throw({error, {corrupt_or_missing_cluster_files, Stat, Run}})
151 update_cluster_status() ->
152 {ok, Status} = rabbit_mnesia:cluster_status_from_mnesia(),
153 write_cluster_status(Status).
155 reset_cluster_status() ->
156 write_cluster_status({[node()], [node()], [node()]}).
158 %%----------------------------------------------------------------------------
159 %% Cluster notifications
160 %%----------------------------------------------------------------------------
163 gen_server:cast(?SERVER, notify_node_up).
165 notify_joined_cluster() ->
166 Nodes = rabbit_mnesia:cluster_nodes(running) -- [node()],
167 gen_server:abcast(Nodes, ?SERVER,
168 {joined_cluster, node(), rabbit_mnesia:node_type()}),
171 notify_left_cluster(Node) ->
172 Nodes = rabbit_mnesia:cluster_nodes(running),
173 gen_server:abcast(Nodes, ?SERVER, {left_cluster, Node}),
176 %%----------------------------------------------------------------------------
178 %%----------------------------------------------------------------------------
181 gen_server:call(?SERVER, partitions, infinity).
184 {Replies, _} = gen_server:multi_call(Nodes, ?SERVER, partitions, infinity),
188 gen_server:multi_call(Nodes, ?SERVER, status, infinity).
191 gen_server:cast(?SERVER, {subscribe, Pid}).
193 %%----------------------------------------------------------------------------
194 %% pause_minority/pause_if_all_down safety
195 %%----------------------------------------------------------------------------
197 %% If we are in a minority and pause_minority mode then a) we are
198 %% going to shut down imminently and b) we should not confirm anything
199 %% until then, since anything we confirm is likely to be lost.
201 %% The same principles apply to a node which isn't part of the preferred
202 %% partition when we are in pause_if_all_down mode.
204 %% We could confirm something by having an HA queue see the pausing
205 %% state (and fail over into it) before the node monitor stops us, or
206 %% by using unmirrored queues and just having them vanish (and
207 %% confiming messages as thrown away).
209 %% So we have channels call in here before issuing confirms, to do a
210 %% lightweight check that we have not entered a pausing state.
212 pause_partition_guard() ->
213 case get(pause_partition_guard) of
217 {ok, M} = application:get_env(rabbit, cluster_partition_handling),
220 pause_minority_guard([], ok);
221 {pause_if_all_down, PreferredNodes, _} ->
222 pause_if_all_down_guard(PreferredNodes, [], ok);
224 put(pause_partition_guard, not_pause_mode),
227 {minority_mode, Nodes, LastState} ->
228 pause_minority_guard(Nodes, LastState);
229 {pause_if_all_down_mode, PreferredNodes, Nodes, LastState} ->
230 pause_if_all_down_guard(PreferredNodes, Nodes, LastState)
233 pause_minority_guard(LastNodes, LastState) ->
235 LastNodes -> LastState;
236 _ -> NewState = case majority() of
240 put(pause_partition_guard,
241 {minority_mode, nodes(), NewState}),
245 pause_if_all_down_guard(PreferredNodes, LastNodes, LastState) ->
247 LastNodes -> LastState;
248 _ -> NewState = case in_preferred_partition(PreferredNodes) of
252 put(pause_partition_guard,
253 {pause_if_all_down_mode, PreferredNodes, nodes(),
258 %%----------------------------------------------------------------------------
259 %% "global" hang workaround.
260 %%----------------------------------------------------------------------------
262 %% This code works around a possible inconsistency in the "global"
263 %% state, causing global:sync/0 to never return.
265 %% 1. A process is spawned.
266 %% 2. If after 15", global:sync() didn't return, the "global"
268 %% 3. If it detects that a sync is blocked for more than 10",
269 %% the process sends fake nodedown/nodeup events to the two
270 %% nodes involved (one local, one remote).
271 %% 4. Both "global" instances restart their synchronisation.
272 %% 5. globao:sync() finally returns.
274 %% FIXME: Remove this workaround, once we got rid of the change to
275 %% "dist_auto_connect" and fixed the bugs uncovered.
278 Pid = spawn(fun workaround_global_hang/0),
280 Pid ! global_sync_done,
283 workaround_global_hang() ->
288 find_blocked_global_peers()
291 find_blocked_global_peers() ->
292 Snapshot1 = snapshot_global_dict(),
294 Snapshot2 = snapshot_global_dict(),
295 find_blocked_global_peers1(Snapshot2, Snapshot1).
297 snapshot_global_dict() ->
298 {status, _, _, [Dict | _]} = sys:get_status(global_name_server),
299 [E || {{sync_tag_his, _}, _} = E <- Dict].
301 find_blocked_global_peers1([{{sync_tag_his, Peer}, _} = Item | Rest],
303 case lists:member(Item, OlderSnapshot) of
304 true -> unblock_global_peer(Peer);
307 find_blocked_global_peers1(Rest, OlderSnapshot);
308 find_blocked_global_peers1([], _) ->
311 unblock_global_peer(PeerNode) ->
313 PeerState = rpc:call(PeerNode, sys, get_status, [global_name_server]),
314 error_logger:info_msg(
315 "Global hang workaround: global state on ~s seems broken~n"
316 " * Peer global state: ~p~n"
317 " * Local global state: ~p~n"
318 "Faking nodedown/nodeup between ~s and ~s~n",
319 [PeerNode, PeerState, sys:get_status(global_name_server),
320 PeerNode, ThisNode]),
321 {global_name_server, ThisNode} ! {nodedown, PeerNode},
322 {global_name_server, PeerNode} ! {nodedown, ThisNode},
323 {global_name_server, ThisNode} ! {nodeup, PeerNode},
324 {global_name_server, PeerNode} ! {nodeup, ThisNode},
327 %%----------------------------------------------------------------------------
328 %% gen_server callbacks
329 %%----------------------------------------------------------------------------
332 %% We trap exits so that the supervisor will not just kill us. We
333 %% want to be sure that we are not going to be killed while
334 %% writing out the cluster status files - bad things can then
336 process_flag(trap_exit, true),
337 net_kernel:monitor_nodes(true, [nodedown_reason]),
338 {ok, _} = mnesia:subscribe(system),
339 %% If the node has been restarted, Mnesia can trigger a system notification
340 %% before the monitor subscribes to receive them. To avoid autoheal blocking due to
341 %% the inconsistent database event never arriving, we being monitoring all running
342 %% nodes as early as possible. The rest of the monitoring ops will only be triggered
343 %% when notifications arrive.
344 Nodes = possibly_partitioned_nodes(),
346 Monitors = lists:foldl(fun(Node, Monitors0) ->
347 pmon:monitor({rabbit, Node}, Monitors0)
348 end, pmon:new(), Nodes),
349 {ok, ensure_keepalive_timer(#state{monitors = Monitors,
350 subscribers = pmon:new(),
352 guid = rabbit_guid:gen(),
353 node_guids = orddict:new(),
354 autoheal = rabbit_autoheal:init()})}.
356 handle_call(partitions, _From, State = #state{partitions = Partitions}) ->
357 {reply, Partitions, State};
359 handle_call(status, _From, State = #state{partitions = Partitions}) ->
360 {reply, [{partitions, Partitions},
361 {nodes, [node() | nodes()]}], State};
363 handle_call(_Request, _From, State) ->
366 handle_cast(notify_node_up, State = #state{guid = GUID}) ->
367 Nodes = rabbit_mnesia:cluster_nodes(running) -- [node()],
368 gen_server:abcast(Nodes, ?SERVER,
369 {node_up, node(), rabbit_mnesia:node_type(), GUID}),
370 %% register other active rabbits with this rabbit
371 DiskNodes = rabbit_mnesia:cluster_nodes(disc),
372 [gen_server:cast(?SERVER, {node_up, N, case lists:member(N, DiskNodes) of
375 end}) || N <- Nodes],
378 %%----------------------------------------------------------------------------
379 %% Partial partition detection
381 %% Every node generates a GUID each time it starts, and announces that
382 %% GUID in 'node_up', with 'announce_guid' sent by return so the new
383 %% node knows the GUIDs of the others. These GUIDs are sent in all the
384 %% partial partition related messages to ensure that we ignore partial
385 %% partition messages from before we restarted (to avoid getting stuck
388 %% When one node gets nodedown from another, it then sends
389 %% 'check_partial_partition' to all the nodes it still thinks are
390 %% alive. If any of those (intermediate) nodes still see the "down"
391 %% node as up, they inform it that this has happened. The original
392 %% node (in 'ignore', 'pause_if_all_down' or 'autoheal' mode) will then
393 %% disconnect from the intermediate node to "upgrade" to a full
396 %% In pause_minority mode it will instead immediately pause until all
397 %% nodes come back. This is because the contract for pause_minority is
398 %% that nodes should never sit in a partitioned state - if it just
399 %% disconnected, it would become a minority, pause, realise it's not
400 %% in a minority any more, and come back, still partitioned (albeit no
401 %% longer partially).
402 %% ----------------------------------------------------------------------------
404 handle_cast({node_up, Node, NodeType, GUID},
405 State = #state{guid = MyGUID,
406 node_guids = GUIDs}) ->
407 cast(Node, {announce_guid, node(), MyGUID}),
408 GUIDs1 = orddict:store(Node, GUID, GUIDs),
409 handle_cast({node_up, Node, NodeType}, State#state{node_guids = GUIDs1});
411 handle_cast({announce_guid, Node, GUID}, State = #state{node_guids = GUIDs}) ->
412 {noreply, State#state{node_guids = orddict:store(Node, GUID, GUIDs)}};
414 handle_cast({check_partial_partition, Node, Rep, NodeGUID, MyGUID, RepGUID},
415 State = #state{guid = MyGUID,
416 node_guids = GUIDs}) ->
417 case lists:member(Node, rabbit_mnesia:cluster_nodes(running)) andalso
418 orddict:find(Node, GUIDs) =:= {ok, NodeGUID} of
419 true -> spawn_link( %%[1]
421 case rpc:call(Node, rabbit, is_running, []) of
424 rabbit_log:warning("Received a 'DOWN' message"
425 " from ~p but still can"
426 " communicate with it ~n",
428 cast(Rep, {partial_partition,
429 Node, node(), RepGUID})
435 %% [1] We checked that we haven't heard the node go down - but we
436 %% really should make sure we can actually communicate with
437 %% it. Otherwise there's a race where we falsely detect a partial
440 %% Now of course the rpc:call/4 may take a long time to return if
441 %% connectivity with the node is actually interrupted - but that's OK,
442 %% we only really want to do something in a timely manner if
443 %% connectivity is OK. However, of course as always we must not block
444 %% the node monitor, so we do the check in a separate process.
446 handle_cast({check_partial_partition, _Node, _Reporter,
447 _NodeGUID, _GUID, _ReporterGUID}, State) ->
450 handle_cast({partial_partition, NotReallyDown, Proxy, MyGUID},
451 State = #state{guid = MyGUID}) ->
452 FmtBase = "Partial partition detected:~n"
453 " * We saw DOWN from ~s~n"
454 " * We can still see ~s which can see ~s~n",
455 ArgsBase = [NotReallyDown, Proxy, NotReallyDown],
456 case application:get_env(rabbit, cluster_partition_handling) of
457 {ok, pause_minority} ->
459 FmtBase ++ " * pause_minority mode enabled~n"
460 "We will therefore pause until the *entire* cluster recovers~n",
462 await_cluster_recovery(fun all_nodes_up/0),
464 {ok, {pause_if_all_down, PreferredNodes, _}} ->
465 case in_preferred_partition(PreferredNodes) of
466 true -> rabbit_log:error(
467 FmtBase ++ "We will therefore intentionally "
468 "disconnect from ~s~n", ArgsBase ++ [Proxy]),
469 upgrade_to_full_partition(Proxy);
470 false -> rabbit_log:info(
471 FmtBase ++ "We are about to pause, no need "
472 "for further actions~n", ArgsBase)
477 FmtBase ++ "We will therefore intentionally disconnect from ~s~n",
478 ArgsBase ++ [Proxy]),
479 upgrade_to_full_partition(Proxy),
483 handle_cast({partial_partition, _GUID, _Reporter, _Proxy}, State) ->
486 %% Sometimes it appears the Erlang VM does not give us nodedown
487 %% messages reliably when another node disconnects from us. Therefore
488 %% we are told just before the disconnection so we can reciprocate.
489 handle_cast({partial_partition_disconnect, Other}, State) ->
490 rabbit_log:error("Partial partition disconnect from ~s~n", [Other]),
494 %% Note: when updating the status file, we can't simply write the
495 %% mnesia information since the message can (and will) overtake the
496 %% mnesia propagation.
497 handle_cast({node_up, Node, NodeType},
498 State = #state{monitors = Monitors}) ->
499 rabbit_log:info("rabbit on node ~p up~n", [Node]),
500 {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
501 write_cluster_status({add_node(Node, AllNodes),
503 disc -> add_node(Node, DiscNodes);
506 add_node(Node, RunningNodes)}),
507 ok = handle_live_rabbit(Node),
508 Monitors1 = case pmon:is_monitored({rabbit, Node}, Monitors) of
512 pmon:monitor({rabbit, Node}, Monitors)
514 {noreply, maybe_autoheal(State#state{monitors = Monitors1})};
516 handle_cast({joined_cluster, Node, NodeType}, State) ->
517 {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
518 write_cluster_status({add_node(Node, AllNodes),
520 disc -> add_node(Node, DiscNodes);
526 handle_cast({left_cluster, Node}, State) ->
527 {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
528 write_cluster_status({del_node(Node, AllNodes), del_node(Node, DiscNodes),
529 del_node(Node, RunningNodes)}),
532 handle_cast({subscribe, Pid}, State = #state{subscribers = Subscribers}) ->
533 {noreply, State#state{subscribers = pmon:monitor(Pid, Subscribers)}};
535 handle_cast(keepalive, State) ->
538 handle_cast(_Msg, State) ->
541 handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason},
542 State = #state{monitors = Monitors, subscribers = Subscribers}) ->
543 rabbit_log:info("rabbit on node ~p down~n", [Node]),
544 {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
545 write_cluster_status({AllNodes, DiscNodes, del_node(Node, RunningNodes)}),
546 [P ! {node_down, Node} || P <- pmon:monitored(Subscribers)],
547 {noreply, handle_dead_rabbit(
549 State#state{monitors = pmon:erase({rabbit, Node}, Monitors)})};
551 handle_info({'DOWN', _MRef, process, Pid, _Reason},
552 State = #state{subscribers = Subscribers}) ->
553 {noreply, State#state{subscribers = pmon:erase(Pid, Subscribers)}};
555 handle_info({nodedown, Node, Info}, State = #state{guid = MyGUID,
556 node_guids = GUIDs}) ->
557 rabbit_log:info("node ~p down: ~p~n",
558 [Node, proplists:get_value(nodedown_reason, Info)]),
559 Check = fun (N, CheckGUID, DownGUID) ->
560 cast(N, {check_partial_partition,
561 Node, node(), DownGUID, CheckGUID, MyGUID})
563 case orddict:find(Node, GUIDs) of
564 {ok, DownGUID} -> Alive = rabbit_mnesia:cluster_nodes(running)
566 [case orddict:find(N, GUIDs) of
567 {ok, CheckGUID} -> Check(N, CheckGUID, DownGUID);
572 {noreply, handle_dead_node(Node, State)};
574 handle_info({nodeup, Node, _Info}, State) ->
575 rabbit_log:info("node ~p up~n", [Node]),
578 handle_info({mnesia_system_event,
579 {inconsistent_database, running_partitioned_network, Node}},
580 State = #state{partitions = Partitions,
581 monitors = Monitors}) ->
582 %% We will not get a node_up from this node - yet we should treat it as
584 State1 = case pmon:is_monitored({rabbit, Node}, Monitors) of
586 false -> State#state{
587 monitors = pmon:monitor({rabbit, Node}, Monitors)}
589 ok = handle_live_rabbit(Node),
590 Partitions1 = lists:usort([Node | Partitions]),
591 {noreply, maybe_autoheal(State1#state{partitions = Partitions1})};
593 handle_info({autoheal_msg, Msg}, State = #state{autoheal = AState,
594 partitions = Partitions}) ->
595 AState1 = rabbit_autoheal:handle_msg(Msg, AState, Partitions),
596 {noreply, State#state{autoheal = AState1}};
598 handle_info(ping_down_nodes, State) ->
599 %% We ping nodes when some are down to ensure that we find out
600 %% about healed partitions quickly. We ping all nodes rather than
601 %% just the ones we know are down for simplicity; it's not expensive
602 %% to ping the nodes that are up, after all.
603 State1 = State#state{down_ping_timer = undefined},
605 %% We ping in a separate process since in a partition it might
606 %% take some noticeable length of time and we don't want to block
607 %% the node monitor for that long.
610 case all_nodes_up() of
612 false -> Self ! ping_down_nodes_again
617 handle_info(ping_down_nodes_again, State) ->
618 {noreply, ensure_ping_timer(State)};
620 handle_info(ping_up_nodes, State) ->
621 %% In this case we need to ensure that we ping "quickly" -
622 %% i.e. only nodes that we know to be up.
623 [cast(N, keepalive) || N <- alive_nodes() -- [node()]],
624 {noreply, ensure_keepalive_timer(State#state{keepalive_timer = undefined})};
626 handle_info({'EXIT', _, _} = Info, State = #state{autoheal = AState0}) ->
627 AState = rabbit_autoheal:process_down(Info, AState0),
628 {noreply, State#state{autoheal = AState}};
630 handle_info(_Info, State) ->
633 terminate(_Reason, State) ->
634 rabbit_misc:stop_timer(State, #state.down_ping_timer),
637 code_change(_OldVsn, State, _Extra) ->
640 %%----------------------------------------------------------------------------
641 %% Functions that call the module specific hooks when nodes go up/down
642 %%----------------------------------------------------------------------------
644 handle_dead_node(Node, State = #state{autoheal = Autoheal}) ->
645 %% In general in rabbit_node_monitor we care about whether the
646 %% rabbit application is up rather than the node; we do this so
647 %% that we can respond in the same way to "rabbitmqctl stop_app"
648 %% and "rabbitmqctl stop" as much as possible.
650 %% However, for pause_minority and pause_if_all_down modes we can't do
651 %% this, since we depend on looking at whether other nodes are up
652 %% to decide whether to come back up ourselves - if we decide that
653 %% based on the rabbit application we would go down and never come
655 case application:get_env(rabbit, cluster_partition_handling) of
656 {ok, pause_minority} ->
657 case majority([Node]) of
659 false -> await_cluster_recovery(fun majority/0)
662 {ok, {pause_if_all_down, PreferredNodes, HowToRecover}} ->
663 case in_preferred_partition(PreferredNodes, [Node]) of
665 false -> await_cluster_recovery(
666 fun in_preferred_partition/0)
669 autoheal -> State#state{autoheal =
670 rabbit_autoheal:node_down(Node, Autoheal)};
676 State#state{autoheal = rabbit_autoheal:node_down(Node, Autoheal)};
678 rabbit_log:warning("cluster_partition_handling ~p unrecognised, "
679 "assuming 'ignore'~n", [Term]),
683 await_cluster_recovery(Condition) ->
684 rabbit_log:warning("Cluster minority/secondary status detected - "
685 "awaiting recovery~n", []),
686 run_outside_applications(fun () ->
688 wait_for_cluster_recovery(Condition)
692 run_outside_applications(Fun, WaitForExistingProcess) ->
694 %% Ignore exit messages from the monitor - the link is needed
695 %% to ensure the monitor detects abnormal exits from this process
696 %% and can reset the 'restarting' status on the autoheal, avoiding
697 %% a deadlock. The monitor is restarted when rabbit does, so messages
698 %% in the other direction should be ignored.
699 process_flag(trap_exit, true),
700 %% If our group leader is inside an application we are about
701 %% to stop, application:stop/1 does not return.
702 group_leader(whereis(init), self()),
703 register_outside_app_process(Fun, WaitForExistingProcess)
706 register_outside_app_process(Fun, WaitForExistingProcess) ->
707 %% Ensure only one such process at a time, the exit(badarg) is
708 %% harmless if one is already running.
710 %% If WaitForExistingProcess is false, the given fun is simply not
711 %% executed at all and the process exits.
713 %% If WaitForExistingProcess is true, we wait for the end of the
714 %% currently running process before executing the given function.
715 try register(rabbit_outside_app_process, self()) of
717 do_run_outside_app_fun(Fun)
719 error:badarg when WaitForExistingProcess ->
720 MRef = erlang:monitor(process, rabbit_outside_app_process),
722 {'DOWN', MRef, _, _, _} ->
723 %% The existing process exited, let's try to
725 register_outside_app_process(Fun, WaitForExistingProcess)
731 do_run_outside_app_fun(Fun) ->
736 "rabbit_outside_app_process:~n~p~n~p~n",
737 [E, erlang:get_stacktrace()])
740 wait_for_cluster_recovery(Condition) ->
743 true -> rabbit:start();
744 false -> timer:sleep(?RABBIT_DOWN_PING_INTERVAL),
745 wait_for_cluster_recovery(Condition)
748 handle_dead_rabbit(Node, State = #state{partitions = Partitions,
749 autoheal = Autoheal}) ->
750 %% TODO: This may turn out to be a performance hog when there are
751 %% lots of nodes. We really only need to execute some of these
752 %% statements on *one* node, rather than all of them.
753 ok = rabbit_networking:on_node_down(Node),
754 ok = rabbit_amqqueue:on_node_down(Node),
755 ok = rabbit_alarm:on_node_down(Node),
756 ok = rabbit_mnesia:on_node_down(Node),
757 %% If we have been partitioned, and we are now in the only remaining
758 %% partition, we no longer care about partitions - forget them. Note
759 %% that we do not attempt to deal with individual (other) partitions
760 %% going away. It's only safe to forget anything about partitions when
761 %% there are no partitions.
762 Down = Partitions -- alive_rabbit_nodes(),
763 NoLongerPartitioned = rabbit_mnesia:cluster_nodes(running),
764 Partitions1 = case Partitions -- Down -- NoLongerPartitioned of
769 State#state{partitions = Partitions1,
770 autoheal = rabbit_autoheal:rabbit_down(Node, Autoheal)}).
772 ensure_ping_timer(State) ->
773 rabbit_misc:ensure_timer(
774 State, #state.down_ping_timer, ?RABBIT_DOWN_PING_INTERVAL,
777 ensure_keepalive_timer(State) ->
778 {ok, Interval} = application:get_env(rabbit, cluster_keepalive_interval),
779 rabbit_misc:ensure_timer(
780 State, #state.keepalive_timer, Interval, ping_up_nodes).
782 handle_live_rabbit(Node) ->
783 ok = rabbit_amqqueue:on_node_up(Node),
784 ok = rabbit_alarm:on_node_up(Node),
785 ok = rabbit_mnesia:on_node_up(Node).
787 maybe_autoheal(State = #state{partitions = []}) ->
790 maybe_autoheal(State = #state{autoheal = AState}) ->
791 case all_nodes_up() of
792 true -> State#state{autoheal = rabbit_autoheal:maybe_start(AState)};
796 %%--------------------------------------------------------------------
798 %%--------------------------------------------------------------------
800 try_read_file(FileName) ->
801 case rabbit_file:read_term_file(FileName) of
802 {ok, Term} -> {ok, Term};
803 {error, enoent} -> {error, enoent};
804 {error, E} -> throw({error, {cannot_read_file, FileName, E}})
807 legacy_cluster_nodes(Nodes) ->
808 %% We get all the info that we can, including the nodes from
809 %% mnesia, which will be there if the node is a disc node (empty
811 lists:usort(Nodes ++ mnesia:system_info(db_nodes)).
813 legacy_disc_nodes(AllNodes) ->
814 case AllNodes == [] orelse lists:member(node(), AllNodes) of
819 add_node(Node, Nodes) -> lists:usort([Node | Nodes]).
821 del_node(Node, Nodes) -> Nodes -- [Node].
823 cast(Node, Msg) -> gen_server:cast({?SERVER, Node}, Msg).
825 upgrade_to_full_partition(Proxy) ->
826 cast(Proxy, {partial_partition_disconnect, node()}),
829 %% When we call this, it's because we want to force Mnesia to detect a
830 %% partition. But if we just disconnect_node/1 then Mnesia won't
831 %% detect a very short partition. So we want to force a slightly
832 %% longer disconnect. Unfortunately we don't have a way to blacklist
833 %% individual nodes; the best we can do is turn off auto-connect
836 application:set_env(kernel, dist_auto_connect, never),
837 erlang:disconnect_node(Node),
839 application:unset_env(kernel, dist_auto_connect),
842 %%--------------------------------------------------------------------
844 %% mnesia:system_info(db_nodes) (and hence
845 %% rabbit_mnesia:cluster_nodes(running)) does not return all nodes
846 %% when partitioned, just those that we are sharing Mnesia state
847 %% with. So we have a small set of replacement functions
848 %% here. "rabbit" in a function's name implies we test if the rabbit
849 %% application is up, not just the node.
851 %% As we use these functions to decide what to do in pause_minority or
852 %% pause_if_all_down states, they *must* be fast, even in the case where
853 %% TCP connections are timing out. So that means we should be careful
854 %% about whether we connect to nodes which are currently disconnected.
859 majority(NodesDown) ->
860 Nodes = rabbit_mnesia:cluster_nodes(all),
861 AliveNodes = alive_nodes(Nodes) -- NodesDown,
862 length(AliveNodes) / length(Nodes) > 0.5.
864 in_preferred_partition() ->
865 {ok, {pause_if_all_down, PreferredNodes, _}} =
866 application:get_env(rabbit, cluster_partition_handling),
867 in_preferred_partition(PreferredNodes).
869 in_preferred_partition(PreferredNodes) ->
870 in_preferred_partition(PreferredNodes, []).
872 in_preferred_partition(PreferredNodes, NodesDown) ->
873 Nodes = rabbit_mnesia:cluster_nodes(all),
874 RealPreferredNodes = [N || N <- PreferredNodes, lists:member(N, Nodes)],
875 AliveNodes = alive_nodes(RealPreferredNodes) -- NodesDown,
876 RealPreferredNodes =:= [] orelse AliveNodes =/= [].
879 Nodes = rabbit_mnesia:cluster_nodes(all),
880 length(alive_nodes(Nodes)) =:= length(Nodes).
882 all_rabbit_nodes_up() ->
883 Nodes = rabbit_mnesia:cluster_nodes(all),
884 length(alive_rabbit_nodes(Nodes)) =:= length(Nodes).
886 alive_nodes() -> alive_nodes(rabbit_mnesia:cluster_nodes(all)).
887 alive_nodes(Nodes) -> [N || N <- Nodes, lists:member(N, [node()|nodes()])].
889 alive_rabbit_nodes() -> alive_rabbit_nodes(rabbit_mnesia:cluster_nodes(all)).
891 alive_rabbit_nodes(Nodes) ->
892 [N || N <- alive_nodes(Nodes), rabbit:is_running(N)].
894 %% This one is allowed to connect!
896 [net_adm:ping(N) || N <- rabbit_mnesia:cluster_nodes(all)],
899 possibly_partitioned_nodes() ->
900 alive_rabbit_nodes() -- rabbit_mnesia:cluster_nodes(running).
903 rabbit_log:info("Starting rabbit_node_monitor~n", []);
904 startup_log(Nodes) ->
905 rabbit_log:info("Starting rabbit_node_monitor, might be partitioned from ~p~n",