]> review.fuel-infra Code Review - packages/trusty/rabbitmq-server.git/blob
381d0542ec273cbf7d90baab19184e8dc58e9343
[packages/trusty/rabbitmq-server.git] /
1 %% The contents of this file are subject to the Mozilla Public License
2 %% Version 1.1 (the "License"); you may not use this file except in
3 %% compliance with the License. You may obtain a copy of the License
4 %% at http://www.mozilla.org/MPL/
5 %%
6 %% Software distributed under the License is distributed on an "AS IS"
7 %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8 %% the License for the specific language governing rights and
9 %% limitations under the License.
10 %%
11 %% The Original Code is RabbitMQ Consistent Hash Exchange.
12 %%
13 %% The Initial Developer of the Original Code is GoPivotal, Inc.
14 %% Copyright (c) 2011-2014 GoPivotal, Inc.  All rights reserved.
15 %%
16
17 -module(rabbit_exchange_type_consistent_hash).
18 -include_lib("rabbit_common/include/rabbit.hrl").
19
20 -behaviour(rabbit_exchange_type).
21
22 -export([description/0, serialise_events/0, route/2]).
23 -export([validate/1, validate_binding/2,
24          create/2, delete/3, policy_changed/2,
25          add_binding/3, remove_bindings/3, assert_args_equivalence/2]).
26 -export([init/0]).
27
28 -record(bucket, {source_number, destination, binding}).
29
30 -rabbit_boot_step(
31    {rabbit_exchange_type_consistent_hash_registry,
32     [{description, "exchange type x-consistent-hash: registry"},
33      {mfa,         {rabbit_registry, register,
34                     [exchange, <<"x-consistent-hash">>, ?MODULE]}},
35      {requires,    rabbit_registry},
36      {enables,     kernel_ready}]}).
37
38 -rabbit_boot_step(
39    {rabbit_exchange_type_consistent_hash_mnesia,
40     [{description, "exchange type x-consistent-hash: mnesia"},
41      {mfa,         {?MODULE, init, []}},
42      {requires,    database},
43      {enables,     external_infrastructure}]}).
44
45 -define(TABLE, ?MODULE).
46 -define(PHASH2_RANGE, 134217728). %% 2^27
47
48 description() ->
49     [{description, <<"Consistent Hashing Exchange">>}].
50
51 serialise_events() -> false.
52
53 route(#exchange { name      = Name,
54                   arguments = Args },
55       #delivery { message = Msg }) ->
56     %% Yes, we're being exceptionally naughty here, by using ets on an
57     %% mnesia table. However, RabbitMQ-server itself is just as
58     %% naughty, and for good reasons.
59
60     %% Note that given the nature of this select, it will force mnesia
61     %% to do a linear scan of the entries in the table that have the
62     %% correct exchange name. More sophisticated solutions include,
63     %% for example, having some sort of tree as the value of a single
64     %% mnesia entry for each exchange. However, such values tend to
65     %% end up as relatively deep data structures which cost a lot to
66     %% continually copy to the process heap. Consequently, such
67     %% approaches have not been found to be much faster, if at all.
68     HashOn = rabbit_misc:table_lookup(Args, <<"hash-header">>),
69     H = erlang:phash2(hash(HashOn, Msg), ?PHASH2_RANGE),
70     case ets:select(?TABLE, [{#bucket { source_number = {Name, '$2'},
71                                         destination   = '$1',
72                                         _             = '_' },
73                               [{'>=', '$2', H}],
74                               ['$1']}], 1) of
75         '$end_of_table' ->
76             case ets:match_object(?TABLE, #bucket { source_number = {Name, '_'},
77                                                     _ = '_' }, 1) of
78                 {[Bucket], _Cont} -> [Bucket#bucket.destination];
79                 _                 -> []
80             end;
81         {Destinations, _Continuation} ->
82             Destinations
83     end.
84
85 validate(_X) -> ok.
86
87 validate_binding(_X, _B) -> ok.
88
89 create(_Tx, _X) -> ok.
90
91 delete(transaction, #exchange { name = Name }, _Bs) ->
92     ok = mnesia:write_lock_table(?TABLE),
93     [ok = mnesia:delete_object(?TABLE, R, write) ||
94         R <- mnesia:match_object(
95                ?TABLE, #bucket{source_number = {Name, '_'}, _ = '_'}, write)],
96     ok;
97 delete(_Tx, _X, _Bs) -> ok.
98
99 policy_changed(_X1, _X2) -> ok.
100
101 add_binding(transaction, _X,
102             #binding { source = S, destination = D, key = K } = B) ->
103     %% Use :select rather than :match_object so that we can limit the
104     %% number of results and not bother copying results over to this
105     %% process.
106     case mnesia:select(?TABLE,
107                        [{#bucket { binding = B, _ = '_' }, [], [ok]}],
108                        1, read) of
109         '$end_of_table' ->
110             ok = mnesia:write_lock_table(?TABLE),
111             BucketCount = lists:min([list_to_integer(binary_to_list(K)),
112                                      ?PHASH2_RANGE]),
113             [ok = mnesia:write(?TABLE,
114                                #bucket { source_number = {S, N},
115                                          destination   = D,
116                                          binding       = B },
117                                write) || N <- find_numbers(S, BucketCount, [])],
118             ok;
119         _ ->
120             ok
121     end;
122 add_binding(none, _X, _B) ->
123     ok.
124
125 remove_bindings(transaction, _X, Bindings) ->
126     ok = mnesia:write_lock_table(?TABLE),
127     [ok = mnesia:delete(?TABLE, Key, write) ||
128         Binding <- Bindings,
129         Key <- mnesia:select(?TABLE,
130                              [{#bucket { source_number = '$1',
131                                          binding       = Binding,
132                                          _             = '_' }, [], ['$1']}],
133                              write)],
134     ok;
135 remove_bindings(none, _X, _Bs) ->
136     ok.
137
138 assert_args_equivalence(X, Args) ->
139     rabbit_exchange:assert_args_equivalence(X, Args).
140
141 init() ->
142     mnesia:create_table(?TABLE, [{record_name, bucket},
143                                  {attributes, record_info(fields, bucket)},
144                                  {type, ordered_set}]),
145     mnesia:add_table_copy(?TABLE, node(), ram_copies),
146     mnesia:wait_for_tables([?TABLE], 30000),
147     ok.
148
149 find_numbers(_Source, 0, Acc) ->
150     Acc;
151 find_numbers(Source, N, Acc) ->
152     Number = random:uniform(?PHASH2_RANGE) - 1,
153     case mnesia:read(?TABLE, {Source, Number}, write) of
154         []  -> find_numbers(Source, N-1, [Number | Acc]);
155         [_] -> find_numbers(Source, N, Acc)
156     end.
157
158 hash(undefined, #basic_message { routing_keys = Routes }) ->
159     Routes;
160 hash({longstr, Header}, #basic_message { content = Content }) ->
161     Headers = rabbit_basic:extract_headers(Content),
162     case Headers of
163         undefined -> undefined;
164         _         -> rabbit_misc:table_lookup(Headers, Header)
165     end.