1 %% The contents of this file are subject to the Mozilla Public License
2 %% Version 1.1 (the "License"); you may not use this file except in
3 %% compliance with the License. You may obtain a copy of the License
4 %% at http://www.mozilla.org/MPL/
6 %% Software distributed under the License is distributed on an "AS IS"
7 %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8 %% the License for the specific language governing rights and
9 %% limitations under the License.
11 %% The Original Code is RabbitMQ Consistent Hash Exchange.
13 %% The Initial Developer of the Original Code is GoPivotal, Inc.
14 %% Copyright (c) 2011-2014 GoPivotal, Inc. All rights reserved.
17 -module(rabbit_exchange_type_consistent_hash).
18 -include_lib("rabbit_common/include/rabbit.hrl").
20 -behaviour(rabbit_exchange_type).
22 -export([description/0, serialise_events/0, route/2]).
23 -export([validate/1, validate_binding/2,
24 create/2, delete/3, policy_changed/2,
25 add_binding/3, remove_bindings/3, assert_args_equivalence/2]).
28 -record(bucket, {source_number, destination, binding}).
31 {rabbit_exchange_type_consistent_hash_registry,
32 [{description, "exchange type x-consistent-hash: registry"},
33 {mfa, {rabbit_registry, register,
34 [exchange, <<"x-consistent-hash">>, ?MODULE]}},
35 {requires, rabbit_registry},
36 {enables, kernel_ready}]}).
39 {rabbit_exchange_type_consistent_hash_mnesia,
40 [{description, "exchange type x-consistent-hash: mnesia"},
41 {mfa, {?MODULE, init, []}},
43 {enables, external_infrastructure}]}).
45 -define(TABLE, ?MODULE).
46 -define(PHASH2_RANGE, 134217728). %% 2^27
49 [{description, <<"Consistent Hashing Exchange">>}].
51 serialise_events() -> false.
53 route(#exchange { name = Name,
55 #delivery { message = Msg }) ->
56 %% Yes, we're being exceptionally naughty here, by using ets on an
57 %% mnesia table. However, RabbitMQ-server itself is just as
58 %% naughty, and for good reasons.
60 %% Note that given the nature of this select, it will force mnesia
61 %% to do a linear scan of the entries in the table that have the
62 %% correct exchange name. More sophisticated solutions include,
63 %% for example, having some sort of tree as the value of a single
64 %% mnesia entry for each exchange. However, such values tend to
65 %% end up as relatively deep data structures which cost a lot to
66 %% continually copy to the process heap. Consequently, such
67 %% approaches have not been found to be much faster, if at all.
68 HashOn = rabbit_misc:table_lookup(Args, <<"hash-header">>),
69 H = erlang:phash2(hash(HashOn, Msg), ?PHASH2_RANGE),
70 case ets:select(?TABLE, [{#bucket { source_number = {Name, '$2'},
76 case ets:match_object(?TABLE, #bucket { source_number = {Name, '_'},
78 {[Bucket], _Cont} -> [Bucket#bucket.destination];
81 {Destinations, _Continuation} ->
87 validate_binding(_X, _B) -> ok.
89 create(_Tx, _X) -> ok.
91 delete(transaction, #exchange { name = Name }, _Bs) ->
92 ok = mnesia:write_lock_table(?TABLE),
93 [ok = mnesia:delete_object(?TABLE, R, write) ||
94 R <- mnesia:match_object(
95 ?TABLE, #bucket{source_number = {Name, '_'}, _ = '_'}, write)],
97 delete(_Tx, _X, _Bs) -> ok.
99 policy_changed(_X1, _X2) -> ok.
101 add_binding(transaction, _X,
102 #binding { source = S, destination = D, key = K } = B) ->
103 %% Use :select rather than :match_object so that we can limit the
104 %% number of results and not bother copying results over to this
106 case mnesia:select(?TABLE,
107 [{#bucket { binding = B, _ = '_' }, [], [ok]}],
110 ok = mnesia:write_lock_table(?TABLE),
111 BucketCount = lists:min([list_to_integer(binary_to_list(K)),
113 [ok = mnesia:write(?TABLE,
114 #bucket { source_number = {S, N},
117 write) || N <- find_numbers(S, BucketCount, [])],
122 add_binding(none, _X, _B) ->
125 remove_bindings(transaction, _X, Bindings) ->
126 ok = mnesia:write_lock_table(?TABLE),
127 [ok = mnesia:delete(?TABLE, Key, write) ||
129 Key <- mnesia:select(?TABLE,
130 [{#bucket { source_number = '$1',
132 _ = '_' }, [], ['$1']}],
135 remove_bindings(none, _X, _Bs) ->
138 assert_args_equivalence(X, Args) ->
139 rabbit_exchange:assert_args_equivalence(X, Args).
142 mnesia:create_table(?TABLE, [{record_name, bucket},
143 {attributes, record_info(fields, bucket)},
144 {type, ordered_set}]),
145 mnesia:add_table_copy(?TABLE, node(), ram_copies),
146 mnesia:wait_for_tables([?TABLE], 30000),
149 find_numbers(_Source, 0, Acc) ->
151 find_numbers(Source, N, Acc) ->
152 Number = random:uniform(?PHASH2_RANGE) - 1,
153 case mnesia:read(?TABLE, {Source, Number}, write) of
154 [] -> find_numbers(Source, N-1, [Number | Acc]);
155 [_] -> find_numbers(Source, N, Acc)
158 hash(undefined, #basic_message { routing_keys = Routes }) ->
160 hash({longstr, Header}, #basic_message { content = Content }) ->
161 Headers = rabbit_basic:extract_headers(Content),
163 undefined -> undefined;
164 _ -> rabbit_misc:table_lookup(Headers, Header)