]> review.fuel-infra Code Review - packages/trusty/rabbitmq-server.git/blob - rabbitmq-server/deps/rabbit/src/rabbit_node_monitor.erl
Update rabbitmq-server
[packages/trusty/rabbitmq-server.git] / rabbitmq-server / deps / rabbit / src / rabbit_node_monitor.erl
1 %% The contents of this file are subject to the Mozilla Public License
2 %% Version 1.1 (the "License"); you may not use this file except in
3 %% compliance with the License. You may obtain a copy of the License
4 %% at http://www.mozilla.org/MPL/
5 %%
6 %% Software distributed under the License is distributed on an "AS IS"
7 %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8 %% the License for the specific language governing rights and
9 %% limitations under the License.
10 %%
11 %% The Original Code is RabbitMQ.
12 %%
13 %% The Initial Developer of the Original Code is GoPivotal, Inc.
14 %% Copyright (c) 2007-2017 Pivotal Software, Inc.  All rights reserved.
15 %%
16
17 -module(rabbit_node_monitor).
18
19 -behaviour(gen_server).
20
21 -export([start_link/0]).
22 -export([running_nodes_filename/0,
23          cluster_status_filename/0, prepare_cluster_status_files/0,
24          write_cluster_status/1, read_cluster_status/0,
25          update_cluster_status/0, reset_cluster_status/0]).
26 -export([notify_node_up/0, notify_joined_cluster/0, notify_left_cluster/1]).
27 -export([partitions/0, partitions/1, status/1, subscribe/1]).
28 -export([pause_partition_guard/0]).
29 -export([global_sync/0]).
30
31 %% gen_server callbacks
32 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
33          code_change/3]).
34
35  %% Utils
36 -export([all_rabbit_nodes_up/0, run_outside_applications/2, ping_all/0,
37          alive_nodes/1, alive_rabbit_nodes/1]).
38
39 -define(SERVER, ?MODULE).
40 -define(RABBIT_UP_RPC_TIMEOUT, 2000).
41 -define(RABBIT_DOWN_PING_INTERVAL, 1000).
42
43 -record(state, {monitors, partitions, subscribers, down_ping_timer,
44                 keepalive_timer, autoheal, guid, node_guids}).
45
46 %%----------------------------------------------------------------------------
47
48 -spec start_link() -> rabbit_types:ok_pid_or_error().
49
50 -spec running_nodes_filename() -> string().
51 -spec cluster_status_filename() -> string().
52 -spec prepare_cluster_status_files() -> 'ok'.
53 -spec write_cluster_status(rabbit_mnesia:cluster_status()) -> 'ok'.
54 -spec read_cluster_status() -> rabbit_mnesia:cluster_status().
55 -spec update_cluster_status() -> 'ok'.
56 -spec reset_cluster_status() -> 'ok'.
57
58 -spec notify_node_up() -> 'ok'.
59 -spec notify_joined_cluster() -> 'ok'.
60 -spec notify_left_cluster(node()) -> 'ok'.
61
62 -spec partitions() -> [node()].
63 -spec partitions([node()]) -> [{node(), [node()]}].
64 -spec status([node()]) -> {[{node(), [node()]}], [node()]}.
65 -spec subscribe(pid()) -> 'ok'.
66 -spec pause_partition_guard() -> 'ok' | 'pausing'.
67
68 -spec all_rabbit_nodes_up() -> boolean().
69 -spec run_outside_applications(fun (() -> any()), boolean()) -> pid().
70 -spec ping_all() -> 'ok'.
71 -spec alive_nodes([node()]) -> [node()].
72 -spec alive_rabbit_nodes([node()]) -> [node()].
73
74 %%----------------------------------------------------------------------------
75 %% Start
76 %%----------------------------------------------------------------------------
77
78 start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
79
80 %%----------------------------------------------------------------------------
81 %% Cluster file operations
82 %%----------------------------------------------------------------------------
83
84 %% The cluster file information is kept in two files.  The "cluster
85 %% status file" contains all the clustered nodes and the disc nodes.
86 %% The "running nodes file" contains the currently running nodes or
87 %% the running nodes at shutdown when the node is down.
88 %%
89 %% We strive to keep the files up to date and we rely on this
90 %% assumption in various situations. Obviously when mnesia is offline
91 %% the information we have will be outdated, but it cannot be
92 %% otherwise.
93
94 running_nodes_filename() ->
95     filename:join(rabbit_mnesia:dir(), "nodes_running_at_shutdown").
96
97 cluster_status_filename() ->
98     rabbit_mnesia:dir() ++ "/cluster_nodes.config".
99
100 prepare_cluster_status_files() ->
101     rabbit_mnesia:ensure_mnesia_dir(),
102     Corrupt = fun(F) -> throw({error, corrupt_cluster_status_files, F}) end,
103     RunningNodes1 = case try_read_file(running_nodes_filename()) of
104                         {ok, [Nodes]} when is_list(Nodes) -> Nodes;
105                         {ok, Other}                       -> Corrupt(Other);
106                         {error, enoent}                   -> []
107                     end,
108     ThisNode = [node()],
109     %% The running nodes file might contain a set or a list, in case
110     %% of the legacy file
111     RunningNodes2 = lists:usort(ThisNode ++ RunningNodes1),
112     {AllNodes1, DiscNodes} =
113         case try_read_file(cluster_status_filename()) of
114             {ok, [{AllNodes, DiscNodes0}]} ->
115                 {AllNodes, DiscNodes0};
116             {ok, [AllNodes0]} when is_list(AllNodes0) ->
117                 {legacy_cluster_nodes(AllNodes0), legacy_disc_nodes(AllNodes0)};
118             {ok, Files} ->
119                 Corrupt(Files);
120             {error, enoent} ->
121                 LegacyNodes = legacy_cluster_nodes([]),
122                 {LegacyNodes, LegacyNodes}
123         end,
124     AllNodes2 = lists:usort(AllNodes1 ++ RunningNodes2),
125     ok = write_cluster_status({AllNodes2, DiscNodes, RunningNodes2}).
126
127 write_cluster_status({All, Disc, Running}) ->
128     ClusterStatusFN = cluster_status_filename(),
129     Res = case rabbit_file:write_term_file(ClusterStatusFN, [{All, Disc}]) of
130               ok ->
131                   RunningNodesFN = running_nodes_filename(),
132                   {RunningNodesFN,
133                    rabbit_file:write_term_file(RunningNodesFN, [Running])};
134               E1 = {error, _} ->
135                   {ClusterStatusFN, E1}
136           end,
137     case Res of
138         {_, ok}           -> ok;
139         {FN, {error, E2}} -> throw({error, {could_not_write_file, FN, E2}})
140     end.
141
142 read_cluster_status() ->
143     case {try_read_file(cluster_status_filename()),
144           try_read_file(running_nodes_filename())} of
145         {{ok, [{All, Disc}]}, {ok, [Running]}} when is_list(Running) ->
146             {All, Disc, Running};
147         {Stat, Run} ->
148             throw({error, {corrupt_or_missing_cluster_files, Stat, Run}})
149     end.
150
151 update_cluster_status() ->
152     {ok, Status} = rabbit_mnesia:cluster_status_from_mnesia(),
153     write_cluster_status(Status).
154
155 reset_cluster_status() ->
156     write_cluster_status({[node()], [node()], [node()]}).
157
158 %%----------------------------------------------------------------------------
159 %% Cluster notifications
160 %%----------------------------------------------------------------------------
161
162 notify_node_up() ->
163     gen_server:cast(?SERVER, notify_node_up).
164
165 notify_joined_cluster() ->
166     Nodes = rabbit_mnesia:cluster_nodes(running) -- [node()],
167     gen_server:abcast(Nodes, ?SERVER,
168                       {joined_cluster, node(), rabbit_mnesia:node_type()}),
169     ok.
170
171 notify_left_cluster(Node) ->
172     Nodes = rabbit_mnesia:cluster_nodes(running),
173     gen_server:abcast(Nodes, ?SERVER, {left_cluster, Node}),
174     ok.
175
176 %%----------------------------------------------------------------------------
177 %% Server calls
178 %%----------------------------------------------------------------------------
179
180 partitions() ->
181     gen_server:call(?SERVER, partitions, infinity).
182
183 partitions(Nodes) ->
184     {Replies, _} = gen_server:multi_call(Nodes, ?SERVER, partitions, infinity),
185     Replies.
186
187 status(Nodes) ->
188     gen_server:multi_call(Nodes, ?SERVER, status, infinity).
189
190 subscribe(Pid) ->
191     gen_server:cast(?SERVER, {subscribe, Pid}).
192
193 %%----------------------------------------------------------------------------
194 %% pause_minority/pause_if_all_down safety
195 %%----------------------------------------------------------------------------
196
197 %% If we are in a minority and pause_minority mode then a) we are
198 %% going to shut down imminently and b) we should not confirm anything
199 %% until then, since anything we confirm is likely to be lost.
200 %%
201 %% The same principles apply to a node which isn't part of the preferred
202 %% partition when we are in pause_if_all_down mode.
203 %%
204 %% We could confirm something by having an HA queue see the pausing
205 %% state (and fail over into it) before the node monitor stops us, or
206 %% by using unmirrored queues and just having them vanish (and
207 %% confiming messages as thrown away).
208 %%
209 %% So we have channels call in here before issuing confirms, to do a
210 %% lightweight check that we have not entered a pausing state.
211
212 pause_partition_guard() ->
213     case get(pause_partition_guard) of
214         not_pause_mode ->
215             ok;
216         undefined ->
217             {ok, M} = application:get_env(rabbit, cluster_partition_handling),
218             case M of
219                 pause_minority ->
220                     pause_minority_guard([], ok);
221                 {pause_if_all_down, PreferredNodes, _} ->
222                     pause_if_all_down_guard(PreferredNodes, [], ok);
223                 _ ->
224                     put(pause_partition_guard, not_pause_mode),
225                     ok
226             end;
227         {minority_mode, Nodes, LastState} ->
228             pause_minority_guard(Nodes, LastState);
229         {pause_if_all_down_mode, PreferredNodes, Nodes, LastState} ->
230             pause_if_all_down_guard(PreferredNodes, Nodes, LastState)
231     end.
232
233 pause_minority_guard(LastNodes, LastState) ->
234     case nodes() of
235         LastNodes -> LastState;
236         _         -> NewState = case majority() of
237                                     false -> pausing;
238                                     true  -> ok
239                                 end,
240                      put(pause_partition_guard,
241                          {minority_mode, nodes(), NewState}),
242                      NewState
243     end.
244
245 pause_if_all_down_guard(PreferredNodes, LastNodes, LastState) ->
246     case nodes() of
247         LastNodes -> LastState;
248         _         -> NewState = case in_preferred_partition(PreferredNodes) of
249                                     false -> pausing;
250                                     true  -> ok
251                                 end,
252                      put(pause_partition_guard,
253                          {pause_if_all_down_mode, PreferredNodes, nodes(),
254                           NewState}),
255                      NewState
256     end.
257
258 %%----------------------------------------------------------------------------
259 %% "global" hang workaround.
260 %%----------------------------------------------------------------------------
261
262 %% This code works around a possible inconsistency in the "global"
263 %% state, causing global:sync/0 to never return.
264 %%
265 %%     1. A process is spawned.
266 %%     2. If after 15", global:sync() didn't return, the "global"
267 %%        state is parsed.
268 %%     3. If it detects that a sync is blocked for more than 10",
269 %%        the process sends fake nodedown/nodeup events to the two
270 %%        nodes involved (one local, one remote).
271 %%     4. Both "global" instances restart their synchronisation.
272 %%     5. globao:sync() finally returns.
273 %%
274 %% FIXME: Remove this workaround, once we got rid of the change to
275 %% "dist_auto_connect" and fixed the bugs uncovered.
276
277 global_sync() ->
278     Pid = spawn(fun workaround_global_hang/0),
279     ok = global:sync(),
280     Pid ! global_sync_done,
281     ok.
282
283 workaround_global_hang() ->
284     receive
285         global_sync_done ->
286             ok
287     after 10000 ->
288             find_blocked_global_peers()
289     end.
290
291 find_blocked_global_peers() ->
292     Snapshot1 = snapshot_global_dict(),
293     timer:sleep(10000),
294     Snapshot2 = snapshot_global_dict(),
295     find_blocked_global_peers1(Snapshot2, Snapshot1).
296
297 snapshot_global_dict() ->
298     {status, _, _, [Dict | _]} = sys:get_status(global_name_server),
299     [E || {{sync_tag_his, _}, _} = E <- Dict].
300
301 find_blocked_global_peers1([{{sync_tag_his, Peer}, _} = Item | Rest],
302   OlderSnapshot) ->
303     case lists:member(Item, OlderSnapshot) of
304         true  -> unblock_global_peer(Peer);
305         false -> ok
306     end,
307     find_blocked_global_peers1(Rest, OlderSnapshot);
308 find_blocked_global_peers1([], _) ->
309     ok.
310
311 unblock_global_peer(PeerNode) ->
312     ThisNode = node(),
313     PeerState = rpc:call(PeerNode, sys, get_status, [global_name_server]),
314     error_logger:info_msg(
315       "Global hang workaround: global state on ~s seems broken~n"
316       " * Peer global state:  ~p~n"
317       " * Local global state: ~p~n"
318       "Faking nodedown/nodeup between ~s and ~s~n",
319       [PeerNode, PeerState, sys:get_status(global_name_server),
320        PeerNode, ThisNode]),
321     {global_name_server, ThisNode} ! {nodedown, PeerNode},
322     {global_name_server, PeerNode} ! {nodedown, ThisNode},
323     {global_name_server, ThisNode} ! {nodeup, PeerNode},
324     {global_name_server, PeerNode} ! {nodeup, ThisNode},
325     ok.
326
327 %%----------------------------------------------------------------------------
328 %% gen_server callbacks
329 %%----------------------------------------------------------------------------
330
331 init([]) ->
332     %% We trap exits so that the supervisor will not just kill us. We
333     %% want to be sure that we are not going to be killed while
334     %% writing out the cluster status files - bad things can then
335     %% happen.
336     process_flag(trap_exit, true),
337     net_kernel:monitor_nodes(true, [nodedown_reason]),
338     {ok, _} = mnesia:subscribe(system),
339     %% If the node has been restarted, Mnesia can trigger a system notification
340     %% before the monitor subscribes to receive them. To avoid autoheal blocking due to
341     %% the inconsistent database event never arriving, we being monitoring all running
342     %% nodes as early as possible. The rest of the monitoring ops will only be triggered
343     %% when notifications arrive.
344     Nodes = possibly_partitioned_nodes(),
345     startup_log(Nodes),
346     Monitors = lists:foldl(fun(Node, Monitors0) ->
347                                    pmon:monitor({rabbit, Node}, Monitors0)
348                            end, pmon:new(), Nodes),
349     {ok, ensure_keepalive_timer(#state{monitors    = Monitors,
350                                        subscribers = pmon:new(),
351                                        partitions  = [],
352                                        guid        = rabbit_guid:gen(),
353                                        node_guids  = orddict:new(),
354                                        autoheal    = rabbit_autoheal:init()})}.
355
356 handle_call(partitions, _From, State = #state{partitions = Partitions}) ->
357     {reply, Partitions, State};
358
359 handle_call(status, _From, State = #state{partitions = Partitions}) ->
360     {reply, [{partitions, Partitions},
361              {nodes,      [node() | nodes()]}], State};
362
363 handle_call(_Request, _From, State) ->
364     {noreply, State}.
365
366 handle_cast(notify_node_up, State = #state{guid = GUID}) ->
367     Nodes = rabbit_mnesia:cluster_nodes(running) -- [node()],
368     gen_server:abcast(Nodes, ?SERVER,
369                       {node_up, node(), rabbit_mnesia:node_type(), GUID}),
370     %% register other active rabbits with this rabbit
371     DiskNodes = rabbit_mnesia:cluster_nodes(disc),
372     [gen_server:cast(?SERVER, {node_up, N, case lists:member(N, DiskNodes) of
373                                                true  -> disc;
374                                                false -> ram
375                                            end}) || N <- Nodes],
376     {noreply, State};
377
378 %%----------------------------------------------------------------------------
379 %% Partial partition detection
380 %%
381 %% Every node generates a GUID each time it starts, and announces that
382 %% GUID in 'node_up', with 'announce_guid' sent by return so the new
383 %% node knows the GUIDs of the others. These GUIDs are sent in all the
384 %% partial partition related messages to ensure that we ignore partial
385 %% partition messages from before we restarted (to avoid getting stuck
386 %% in a loop).
387 %%
388 %% When one node gets nodedown from another, it then sends
389 %% 'check_partial_partition' to all the nodes it still thinks are
390 %% alive. If any of those (intermediate) nodes still see the "down"
391 %% node as up, they inform it that this has happened. The original
392 %% node (in 'ignore', 'pause_if_all_down' or 'autoheal' mode) will then
393 %% disconnect from the intermediate node to "upgrade" to a full
394 %% partition.
395 %%
396 %% In pause_minority mode it will instead immediately pause until all
397 %% nodes come back. This is because the contract for pause_minority is
398 %% that nodes should never sit in a partitioned state - if it just
399 %% disconnected, it would become a minority, pause, realise it's not
400 %% in a minority any more, and come back, still partitioned (albeit no
401 %% longer partially).
402 %% ----------------------------------------------------------------------------
403
404 handle_cast({node_up, Node, NodeType, GUID},
405             State = #state{guid       = MyGUID,
406                            node_guids = GUIDs}) ->
407     cast(Node, {announce_guid, node(), MyGUID}),
408     GUIDs1 = orddict:store(Node, GUID, GUIDs),
409     handle_cast({node_up, Node, NodeType}, State#state{node_guids = GUIDs1});
410
411 handle_cast({announce_guid, Node, GUID}, State = #state{node_guids = GUIDs}) ->
412     {noreply, State#state{node_guids = orddict:store(Node, GUID, GUIDs)}};
413
414 handle_cast({check_partial_partition, Node, Rep, NodeGUID, MyGUID, RepGUID},
415             State = #state{guid       = MyGUID,
416                            node_guids = GUIDs}) ->
417     case lists:member(Node, rabbit_mnesia:cluster_nodes(running)) andalso
418         orddict:find(Node, GUIDs) =:= {ok, NodeGUID} of
419         true  -> spawn_link( %%[1]
420                    fun () ->
421                            case rpc:call(Node, rabbit, is_running, []) of
422                                {badrpc, _} -> ok;
423                                _           ->
424                                    rabbit_log:warning("Received a 'DOWN' message"
425                                                       " from ~p but still can"
426                                                       " communicate with it ~n",
427                                                       [Node]),
428                                    cast(Rep, {partial_partition,
429                                                          Node, node(), RepGUID})
430                            end
431                    end);
432         false -> ok
433     end,
434     {noreply, State};
435 %% [1] We checked that we haven't heard the node go down - but we
436 %% really should make sure we can actually communicate with
437 %% it. Otherwise there's a race where we falsely detect a partial
438 %% partition.
439 %%
440 %% Now of course the rpc:call/4 may take a long time to return if
441 %% connectivity with the node is actually interrupted - but that's OK,
442 %% we only really want to do something in a timely manner if
443 %% connectivity is OK. However, of course as always we must not block
444 %% the node monitor, so we do the check in a separate process.
445
446 handle_cast({check_partial_partition, _Node, _Reporter,
447              _NodeGUID, _GUID, _ReporterGUID}, State) ->
448     {noreply, State};
449
450 handle_cast({partial_partition, NotReallyDown, Proxy, MyGUID},
451             State = #state{guid = MyGUID}) ->
452     FmtBase = "Partial partition detected:~n"
453         " * We saw DOWN from ~s~n"
454         " * We can still see ~s which can see ~s~n",
455     ArgsBase = [NotReallyDown, Proxy, NotReallyDown],
456     case application:get_env(rabbit, cluster_partition_handling) of
457         {ok, pause_minority} ->
458             rabbit_log:error(
459               FmtBase ++ " * pause_minority mode enabled~n"
460               "We will therefore pause until the *entire* cluster recovers~n",
461               ArgsBase),
462             await_cluster_recovery(fun all_nodes_up/0),
463             {noreply, State};
464         {ok, {pause_if_all_down, PreferredNodes, _}} ->
465             case in_preferred_partition(PreferredNodes) of
466                 true  -> rabbit_log:error(
467                            FmtBase ++ "We will therefore intentionally "
468                            "disconnect from ~s~n", ArgsBase ++ [Proxy]),
469                          upgrade_to_full_partition(Proxy);
470                 false -> rabbit_log:info(
471                            FmtBase ++ "We are about to pause, no need "
472                            "for further actions~n", ArgsBase)
473             end,
474             {noreply, State};
475         {ok, _} ->
476             rabbit_log:error(
477               FmtBase ++ "We will therefore intentionally disconnect from ~s~n",
478               ArgsBase ++ [Proxy]),
479             upgrade_to_full_partition(Proxy),
480             {noreply, State}
481     end;
482
483 handle_cast({partial_partition, _GUID, _Reporter, _Proxy}, State) ->
484     {noreply, State};
485
486 %% Sometimes it appears the Erlang VM does not give us nodedown
487 %% messages reliably when another node disconnects from us. Therefore
488 %% we are told just before the disconnection so we can reciprocate.
489 handle_cast({partial_partition_disconnect, Other}, State) ->
490     rabbit_log:error("Partial partition disconnect from ~s~n", [Other]),
491     disconnect(Other),
492     {noreply, State};
493
494 %% Note: when updating the status file, we can't simply write the
495 %% mnesia information since the message can (and will) overtake the
496 %% mnesia propagation.
497 handle_cast({node_up, Node, NodeType},
498             State = #state{monitors = Monitors}) ->
499     rabbit_log:info("rabbit on node ~p up~n", [Node]),
500     {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
501     write_cluster_status({add_node(Node, AllNodes),
502                           case NodeType of
503                               disc -> add_node(Node, DiscNodes);
504                               ram  -> DiscNodes
505                           end,
506                           add_node(Node, RunningNodes)}),
507     ok = handle_live_rabbit(Node),
508     Monitors1 = case pmon:is_monitored({rabbit, Node}, Monitors) of
509                     true ->
510                         Monitors;
511                     false ->
512                         pmon:monitor({rabbit, Node}, Monitors)
513                 end,
514     {noreply, maybe_autoheal(State#state{monitors = Monitors1})};
515
516 handle_cast({joined_cluster, Node, NodeType}, State) ->
517     {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
518     write_cluster_status({add_node(Node, AllNodes),
519                           case NodeType of
520                               disc -> add_node(Node, DiscNodes);
521                               ram  -> DiscNodes
522                           end,
523                           RunningNodes}),
524     {noreply, State};
525
526 handle_cast({left_cluster, Node}, State) ->
527     {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
528     write_cluster_status({del_node(Node, AllNodes), del_node(Node, DiscNodes),
529                           del_node(Node, RunningNodes)}),
530     {noreply, State};
531
532 handle_cast({subscribe, Pid}, State = #state{subscribers = Subscribers}) ->
533     {noreply, State#state{subscribers = pmon:monitor(Pid, Subscribers)}};
534
535 handle_cast(keepalive, State) ->
536     {noreply, State};
537
538 handle_cast(_Msg, State) ->
539     {noreply, State}.
540
541 handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason},
542             State = #state{monitors = Monitors, subscribers = Subscribers}) ->
543     rabbit_log:info("rabbit on node ~p down~n", [Node]),
544     {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
545     write_cluster_status({AllNodes, DiscNodes, del_node(Node, RunningNodes)}),
546     [P ! {node_down, Node} || P <- pmon:monitored(Subscribers)],
547     {noreply, handle_dead_rabbit(
548                 Node,
549                 State#state{monitors = pmon:erase({rabbit, Node}, Monitors)})};
550
551 handle_info({'DOWN', _MRef, process, Pid, _Reason},
552             State = #state{subscribers = Subscribers}) ->
553     {noreply, State#state{subscribers = pmon:erase(Pid, Subscribers)}};
554
555 handle_info({nodedown, Node, Info}, State = #state{guid       = MyGUID,
556                                                    node_guids = GUIDs}) ->
557     rabbit_log:info("node ~p down: ~p~n",
558                     [Node, proplists:get_value(nodedown_reason, Info)]),
559     Check = fun (N, CheckGUID, DownGUID) ->
560                     cast(N, {check_partial_partition,
561                              Node, node(), DownGUID, CheckGUID, MyGUID})
562             end,
563     case orddict:find(Node, GUIDs) of
564         {ok, DownGUID} -> Alive = rabbit_mnesia:cluster_nodes(running)
565                               -- [node(), Node],
566                           [case orddict:find(N, GUIDs) of
567                                {ok, CheckGUID} -> Check(N, CheckGUID, DownGUID);
568                                error           -> ok
569                            end || N <- Alive];
570         error          -> ok
571     end,
572     {noreply, handle_dead_node(Node, State)};
573
574 handle_info({nodeup, Node, _Info}, State) ->
575     rabbit_log:info("node ~p up~n", [Node]),
576     {noreply, State};
577
578 handle_info({mnesia_system_event,
579              {inconsistent_database, running_partitioned_network, Node}},
580             State = #state{partitions = Partitions,
581                            monitors   = Monitors}) ->
582     %% We will not get a node_up from this node - yet we should treat it as
583     %% up (mostly).
584     State1 = case pmon:is_monitored({rabbit, Node}, Monitors) of
585                  true  -> State;
586                  false -> State#state{
587                             monitors = pmon:monitor({rabbit, Node}, Monitors)}
588              end,
589     ok = handle_live_rabbit(Node),
590     Partitions1 = lists:usort([Node | Partitions]),
591     {noreply, maybe_autoheal(State1#state{partitions = Partitions1})};
592
593 handle_info({autoheal_msg, Msg}, State = #state{autoheal   = AState,
594                                                 partitions = Partitions}) ->
595     AState1 = rabbit_autoheal:handle_msg(Msg, AState, Partitions),
596     {noreply, State#state{autoheal = AState1}};
597
598 handle_info(ping_down_nodes, State) ->
599     %% We ping nodes when some are down to ensure that we find out
600     %% about healed partitions quickly. We ping all nodes rather than
601     %% just the ones we know are down for simplicity; it's not expensive
602     %% to ping the nodes that are up, after all.
603     State1 = State#state{down_ping_timer = undefined},
604     Self = self(),
605     %% We ping in a separate process since in a partition it might
606     %% take some noticeable length of time and we don't want to block
607     %% the node monitor for that long.
608     spawn_link(fun () ->
609                        ping_all(),
610                        case all_nodes_up() of
611                            true  -> ok;
612                            false -> Self ! ping_down_nodes_again
613                        end
614                end),
615     {noreply, State1};
616
617 handle_info(ping_down_nodes_again, State) ->
618     {noreply, ensure_ping_timer(State)};
619
620 handle_info(ping_up_nodes, State) ->
621     %% In this case we need to ensure that we ping "quickly" -
622     %% i.e. only nodes that we know to be up.
623     [cast(N, keepalive) || N <- alive_nodes() -- [node()]],
624     {noreply, ensure_keepalive_timer(State#state{keepalive_timer = undefined})};
625
626 handle_info({'EXIT', _, _} = Info, State = #state{autoheal = AState0}) ->
627     AState = rabbit_autoheal:process_down(Info, AState0),
628     {noreply, State#state{autoheal = AState}};
629
630 handle_info(_Info, State) ->
631     {noreply, State}.
632
633 terminate(_Reason, State) ->
634     rabbit_misc:stop_timer(State, #state.down_ping_timer),
635     ok.
636
637 code_change(_OldVsn, State, _Extra) ->
638     {ok, State}.
639
640 %%----------------------------------------------------------------------------
641 %% Functions that call the module specific hooks when nodes go up/down
642 %%----------------------------------------------------------------------------
643
644 handle_dead_node(Node, State = #state{autoheal = Autoheal}) ->
645     %% In general in rabbit_node_monitor we care about whether the
646     %% rabbit application is up rather than the node; we do this so
647     %% that we can respond in the same way to "rabbitmqctl stop_app"
648     %% and "rabbitmqctl stop" as much as possible.
649     %%
650     %% However, for pause_minority and pause_if_all_down modes we can't do
651     %% this, since we depend on looking at whether other nodes are up
652     %% to decide whether to come back up ourselves - if we decide that
653     %% based on the rabbit application we would go down and never come
654     %% back.
655     case application:get_env(rabbit, cluster_partition_handling) of
656         {ok, pause_minority} ->
657             case majority([Node]) of
658                 true  -> ok;
659                 false -> await_cluster_recovery(fun majority/0)
660             end,
661             State;
662         {ok, {pause_if_all_down, PreferredNodes, HowToRecover}} ->
663             case in_preferred_partition(PreferredNodes, [Node]) of
664                 true  -> ok;
665                 false -> await_cluster_recovery(
666                            fun in_preferred_partition/0)
667             end,
668             case HowToRecover of
669                 autoheal -> State#state{autoheal =
670                               rabbit_autoheal:node_down(Node, Autoheal)};
671                 _        -> State
672             end;
673         {ok, ignore} ->
674             State;
675         {ok, autoheal} ->
676             State#state{autoheal = rabbit_autoheal:node_down(Node, Autoheal)};
677         {ok, Term} ->
678             rabbit_log:warning("cluster_partition_handling ~p unrecognised, "
679                                "assuming 'ignore'~n", [Term]),
680             State
681     end.
682
683 await_cluster_recovery(Condition) ->
684     rabbit_log:warning("Cluster minority/secondary status detected - "
685                        "awaiting recovery~n", []),
686     run_outside_applications(fun () ->
687                                      rabbit:stop(),
688                                      wait_for_cluster_recovery(Condition)
689                              end, false),
690     ok.
691
692 run_outside_applications(Fun, WaitForExistingProcess) ->
693     spawn_link(fun () ->
694                        %% Ignore exit messages from the monitor - the link is needed
695                        %% to ensure the monitor detects abnormal exits from this process
696                        %% and can reset the 'restarting' status on the autoheal, avoiding
697                        %% a deadlock. The monitor is restarted when rabbit does, so messages
698                        %% in the other direction should be ignored.
699                        process_flag(trap_exit, true),
700                        %% If our group leader is inside an application we are about
701                        %% to stop, application:stop/1 does not return.
702                        group_leader(whereis(init), self()),
703                        register_outside_app_process(Fun, WaitForExistingProcess)
704                end).
705
706 register_outside_app_process(Fun, WaitForExistingProcess) ->
707     %% Ensure only one such process at a time, the exit(badarg) is
708     %% harmless if one is already running.
709     %%
710     %% If WaitForExistingProcess is false, the given fun is simply not
711     %% executed at all and the process exits.
712     %%
713     %% If WaitForExistingProcess is true, we wait for the end of the
714     %% currently running process before executing the given function.
715     try register(rabbit_outside_app_process, self()) of
716         true ->
717             do_run_outside_app_fun(Fun)
718     catch
719         error:badarg when WaitForExistingProcess ->
720             MRef = erlang:monitor(process, rabbit_outside_app_process),
721             receive
722                 {'DOWN', MRef, _, _, _} ->
723                     %% The existing process exited, let's try to
724                     %% register again.
725                     register_outside_app_process(Fun, WaitForExistingProcess)
726             end;
727         error:badarg ->
728             ok
729     end.
730
731 do_run_outside_app_fun(Fun) ->
732     try
733         Fun()
734     catch _:E ->
735             rabbit_log:error(
736               "rabbit_outside_app_process:~n~p~n~p~n",
737               [E, erlang:get_stacktrace()])
738     end.
739
740 wait_for_cluster_recovery(Condition) ->
741     ping_all(),
742     case Condition() of
743         true  -> rabbit:start();
744         false -> timer:sleep(?RABBIT_DOWN_PING_INTERVAL),
745                  wait_for_cluster_recovery(Condition)
746     end.
747
748 handle_dead_rabbit(Node, State = #state{partitions = Partitions,
749                                         autoheal   = Autoheal}) ->
750     %% TODO: This may turn out to be a performance hog when there are
751     %% lots of nodes.  We really only need to execute some of these
752     %% statements on *one* node, rather than all of them.
753     ok = rabbit_networking:on_node_down(Node),
754     ok = rabbit_amqqueue:on_node_down(Node),
755     ok = rabbit_alarm:on_node_down(Node),
756     ok = rabbit_mnesia:on_node_down(Node),
757     %% If we have been partitioned, and we are now in the only remaining
758     %% partition, we no longer care about partitions - forget them. Note
759     %% that we do not attempt to deal with individual (other) partitions
760     %% going away. It's only safe to forget anything about partitions when
761     %% there are no partitions.
762     Down = Partitions -- alive_rabbit_nodes(),
763     NoLongerPartitioned = rabbit_mnesia:cluster_nodes(running),
764     Partitions1 = case Partitions -- Down -- NoLongerPartitioned of
765                       [] -> [];
766                       _  -> Partitions
767                   end,
768     ensure_ping_timer(
769       State#state{partitions = Partitions1,
770                   autoheal   = rabbit_autoheal:rabbit_down(Node, Autoheal)}).
771
772 ensure_ping_timer(State) ->
773     rabbit_misc:ensure_timer(
774       State, #state.down_ping_timer, ?RABBIT_DOWN_PING_INTERVAL,
775       ping_down_nodes).
776
777 ensure_keepalive_timer(State) ->
778     {ok, Interval} = application:get_env(rabbit, cluster_keepalive_interval),
779     rabbit_misc:ensure_timer(
780       State, #state.keepalive_timer, Interval, ping_up_nodes).
781
782 handle_live_rabbit(Node) ->
783     ok = rabbit_amqqueue:on_node_up(Node),
784     ok = rabbit_alarm:on_node_up(Node),
785     ok = rabbit_mnesia:on_node_up(Node).
786
787 maybe_autoheal(State = #state{partitions = []}) ->
788     State;
789
790 maybe_autoheal(State = #state{autoheal = AState}) ->
791     case all_nodes_up() of
792         true  -> State#state{autoheal = rabbit_autoheal:maybe_start(AState)};
793         false -> State
794     end.
795
796 %%--------------------------------------------------------------------
797 %% Internal utils
798 %%--------------------------------------------------------------------
799
800 try_read_file(FileName) ->
801     case rabbit_file:read_term_file(FileName) of
802         {ok, Term}      -> {ok, Term};
803         {error, enoent} -> {error, enoent};
804         {error, E}      -> throw({error, {cannot_read_file, FileName, E}})
805     end.
806
807 legacy_cluster_nodes(Nodes) ->
808     %% We get all the info that we can, including the nodes from
809     %% mnesia, which will be there if the node is a disc node (empty
810     %% list otherwise)
811     lists:usort(Nodes ++ mnesia:system_info(db_nodes)).
812
813 legacy_disc_nodes(AllNodes) ->
814     case AllNodes == [] orelse lists:member(node(), AllNodes) of
815         true  -> [node()];
816         false -> []
817     end.
818
819 add_node(Node, Nodes) -> lists:usort([Node | Nodes]).
820
821 del_node(Node, Nodes) -> Nodes -- [Node].
822
823 cast(Node, Msg) -> gen_server:cast({?SERVER, Node}, Msg).
824
825 upgrade_to_full_partition(Proxy) ->
826     cast(Proxy, {partial_partition_disconnect, node()}),
827     disconnect(Proxy).
828
829 %% When we call this, it's because we want to force Mnesia to detect a
830 %% partition. But if we just disconnect_node/1 then Mnesia won't
831 %% detect a very short partition. So we want to force a slightly
832 %% longer disconnect. Unfortunately we don't have a way to blacklist
833 %% individual nodes; the best we can do is turn off auto-connect
834 %% altogether.
835 disconnect(Node) ->
836     application:set_env(kernel, dist_auto_connect, never),
837     erlang:disconnect_node(Node),
838     timer:sleep(1000),
839     application:unset_env(kernel, dist_auto_connect),
840     ok.
841
842 %%--------------------------------------------------------------------
843
844 %% mnesia:system_info(db_nodes) (and hence
845 %% rabbit_mnesia:cluster_nodes(running)) does not return all nodes
846 %% when partitioned, just those that we are sharing Mnesia state
847 %% with. So we have a small set of replacement functions
848 %% here. "rabbit" in a function's name implies we test if the rabbit
849 %% application is up, not just the node.
850
851 %% As we use these functions to decide what to do in pause_minority or
852 %% pause_if_all_down states, they *must* be fast, even in the case where
853 %% TCP connections are timing out. So that means we should be careful
854 %% about whether we connect to nodes which are currently disconnected.
855
856 majority() ->
857     majority([]).
858
859 majority(NodesDown) ->
860     Nodes = rabbit_mnesia:cluster_nodes(all),
861     AliveNodes = alive_nodes(Nodes) -- NodesDown,
862     length(AliveNodes) / length(Nodes) > 0.5.
863
864 in_preferred_partition() ->
865     {ok, {pause_if_all_down, PreferredNodes, _}} =
866         application:get_env(rabbit, cluster_partition_handling),
867     in_preferred_partition(PreferredNodes).
868
869 in_preferred_partition(PreferredNodes) ->
870     in_preferred_partition(PreferredNodes, []).
871
872 in_preferred_partition(PreferredNodes, NodesDown) ->
873     Nodes = rabbit_mnesia:cluster_nodes(all),
874     RealPreferredNodes = [N || N <- PreferredNodes, lists:member(N, Nodes)],
875     AliveNodes = alive_nodes(RealPreferredNodes) -- NodesDown,
876     RealPreferredNodes =:= [] orelse AliveNodes =/= [].
877
878 all_nodes_up() ->
879     Nodes = rabbit_mnesia:cluster_nodes(all),
880     length(alive_nodes(Nodes)) =:= length(Nodes).
881
882 all_rabbit_nodes_up() ->
883     Nodes = rabbit_mnesia:cluster_nodes(all),
884     length(alive_rabbit_nodes(Nodes)) =:= length(Nodes).
885
886 alive_nodes() -> alive_nodes(rabbit_mnesia:cluster_nodes(all)).
887 alive_nodes(Nodes) -> [N || N <- Nodes, lists:member(N, [node()|nodes()])].
888
889 alive_rabbit_nodes() -> alive_rabbit_nodes(rabbit_mnesia:cluster_nodes(all)).
890
891 alive_rabbit_nodes(Nodes) ->
892     [N || N <- alive_nodes(Nodes), rabbit:is_running(N)].
893
894 %% This one is allowed to connect!
895 ping_all() ->
896     [net_adm:ping(N) || N <- rabbit_mnesia:cluster_nodes(all)],
897     ok.
898
899 possibly_partitioned_nodes() ->
900     alive_rabbit_nodes() -- rabbit_mnesia:cluster_nodes(running).
901
902 startup_log([]) ->
903     rabbit_log:info("Starting rabbit_node_monitor~n", []);
904 startup_log(Nodes) ->
905     rabbit_log:info("Starting rabbit_node_monitor, might be partitioned from ~p~n",
906                     [Nodes]).