]> review.fuel-infra Code Review - packages/trusty/rabbitmq-server.git/blob
68e3253241027204e2895fd9013f02cb98844c54
[packages/trusty/rabbitmq-server.git] /
1 %% The contents of this file are subject to the Mozilla Public License
2 %% Version 1.1 (the "License"); you may not use this file except in
3 %% compliance with the License. You may obtain a copy of the License
4 %% at http://www.mozilla.org/MPL/
5 %%
6 %% Software distributed under the License is distributed on an "AS IS"
7 %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8 %% the License for the specific language governing rights and
9 %% limitations under the License.
10 %%
11 %% The Original Code is RabbitMQ Consistent Hash Exchange.
12 %%
13 %% The Initial Developer of the Original Code is GoPivotal, Inc.
14 %% Copyright (c) 2011-2014 GoPivotal, Inc.  All rights reserved.
15 %%
16
17 -module(rabbit_exchange_type_consistent_hash).
18 -include_lib("rabbit_common/include/rabbit.hrl").
19
20 -behaviour(rabbit_exchange_type).
21
22 -export([description/0, serialise_events/0, route/2]).
23 -export([validate/1, validate_binding/2,
24          create/2, delete/3, policy_changed/2,
25          add_binding/3, remove_bindings/3, assert_args_equivalence/2]).
26 -export([init/0]).
27
28 -record(bucket, {source_number, destination, binding}).
29
30 -rabbit_boot_step(
31    {rabbit_exchange_type_consistent_hash_registry,
32     [{description, "exchange type x-consistent-hash: registry"},
33      {mfa,         {rabbit_registry, register,
34                     [exchange, <<"x-consistent-hash">>, ?MODULE]}},
35      {requires,    rabbit_registry},
36      {enables,     kernel_ready},
37      {cleanup,     {rabbit_registry, unregister,
38                     [exchange, <<"x-consistent-hash">>]}}]}).
39
40 -rabbit_boot_step(
41    {rabbit_exchange_type_consistent_hash_mnesia,
42     [{description, "exchange type x-consistent-hash: mnesia"},
43      {mfa,         {?MODULE, init, []}},
44      {requires,    database},
45      {enables,     external_infrastructure}]}).
46
47 -define(TABLE, ?MODULE).
48 -define(PHASH2_RANGE, 134217728). %% 2^27
49
50 description() ->
51     [{description, <<"Consistent Hashing Exchange">>}].
52
53 serialise_events() -> false.
54
55 route(#exchange { name      = Name,
56                   arguments = Args },
57       #delivery { message = Msg }) ->
58     %% Yes, we're being exceptionally naughty here, by using ets on an
59     %% mnesia table. However, RabbitMQ-server itself is just as
60     %% naughty, and for good reasons.
61
62     %% Note that given the nature of this select, it will force mnesia
63     %% to do a linear scan of the entries in the table that have the
64     %% correct exchange name. More sophisticated solutions include,
65     %% for example, having some sort of tree as the value of a single
66     %% mnesia entry for each exchange. However, such values tend to
67     %% end up as relatively deep data structures which cost a lot to
68     %% continually copy to the process heap. Consequently, such
69     %% approaches have not been found to be much faster, if at all.
70     HashOn = rabbit_misc:table_lookup(Args, <<"hash-header">>),
71     H = erlang:phash2(hash(HashOn, Msg), ?PHASH2_RANGE),
72     case ets:select(?TABLE, [{#bucket { source_number = {Name, '$2'},
73                                         destination   = '$1',
74                                         _             = '_' },
75                               [{'>=', '$2', H}],
76                               ['$1']}], 1) of
77         '$end_of_table' ->
78             case ets:match_object(?TABLE, #bucket { source_number = {Name, '_'},
79                                                     _ = '_' }, 1) of
80                 {[Bucket], _Cont} -> [Bucket#bucket.destination];
81                 _                 -> []
82             end;
83         {Destinations, _Continuation} ->
84             Destinations
85     end.
86
87 validate(_X) -> ok.
88
89 validate_binding(_X, _B) -> ok.
90
91 create(_Tx, _X) -> ok.
92
93 delete(transaction, #exchange { name = Name }, _Bs) ->
94     ok = mnesia:write_lock_table(?TABLE),
95     [ok = mnesia:delete_object(?TABLE, R, write) ||
96         R <- mnesia:match_object(
97                ?TABLE, #bucket{source_number = {Name, '_'}, _ = '_'}, write)],
98     ok;
99 delete(_Tx, _X, _Bs) -> ok.
100
101 policy_changed(_X1, _X2) -> ok.
102
103 add_binding(transaction, _X,
104             #binding { source = S, destination = D, key = K } = B) ->
105     %% Use :select rather than :match_object so that we can limit the
106     %% number of results and not bother copying results over to this
107     %% process.
108     case mnesia:select(?TABLE,
109                        [{#bucket { binding = B, _ = '_' }, [], [ok]}],
110                        1, read) of
111         '$end_of_table' ->
112             ok = mnesia:write_lock_table(?TABLE),
113             BucketCount = lists:min([list_to_integer(binary_to_list(K)),
114                                      ?PHASH2_RANGE]),
115             [ok = mnesia:write(?TABLE,
116                                #bucket { source_number = {S, N},
117                                          destination   = D,
118                                          binding       = B },
119                                write) || N <- find_numbers(S, BucketCount, [])],
120             ok;
121         _ ->
122             ok
123     end;
124 add_binding(none, _X, _B) ->
125     ok.
126
127 remove_bindings(transaction, _X, Bindings) ->
128     ok = mnesia:write_lock_table(?TABLE),
129     [ok = mnesia:delete(?TABLE, Key, write) ||
130         Binding <- Bindings,
131         Key <- mnesia:select(?TABLE,
132                              [{#bucket { source_number = '$1',
133                                          binding       = Binding,
134                                          _             = '_' }, [], ['$1']}],
135                              write)],
136     ok;
137 remove_bindings(none, _X, _Bs) ->
138     ok.
139
140 assert_args_equivalence(X, Args) ->
141     rabbit_exchange:assert_args_equivalence(X, Args).
142
143 init() ->
144     mnesia:create_table(?TABLE, [{record_name, bucket},
145                                  {attributes, record_info(fields, bucket)},
146                                  {type, ordered_set}]),
147     mnesia:add_table_copy(?TABLE, node(), ram_copies),
148     mnesia:wait_for_tables([?TABLE], 30000),
149     ok.
150
151 find_numbers(_Source, 0, Acc) ->
152     Acc;
153 find_numbers(Source, N, Acc) ->
154     Number = random:uniform(?PHASH2_RANGE) - 1,
155     case mnesia:read(?TABLE, {Source, Number}, write) of
156         []  -> find_numbers(Source, N-1, [Number | Acc]);
157         [_] -> find_numbers(Source, N, Acc)
158     end.
159
160 hash(undefined, #basic_message { routing_keys = Routes }) ->
161     Routes;
162 hash({longstr, Header}, #basic_message { content = Content }) ->
163     Headers = rabbit_basic:extract_headers(Content),
164     case Headers of
165         undefined -> undefined;
166         _         -> rabbit_misc:table_lookup(Headers, Header)
167     end.