1 %% The contents of this file are subject to the Mozilla Public License
2 %% Version 1.1 (the "License"); you may not use this file except in
3 %% compliance with the License. You may obtain a copy of the License
4 %% at http://www.mozilla.org/MPL/
6 %% Software distributed under the License is distributed on an "AS IS"
7 %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8 %% the License for the specific language governing rights and
9 %% limitations under the License.
11 %% The Original Code is RabbitMQ Consistent Hash Exchange.
13 %% The Initial Developer of the Original Code is GoPivotal, Inc.
14 %% Copyright (c) 2011-2014 GoPivotal, Inc. All rights reserved.
17 -module(rabbit_exchange_type_consistent_hash).
18 -include_lib("rabbit_common/include/rabbit.hrl").
20 -behaviour(rabbit_exchange_type).
22 -export([description/0, serialise_events/0, route/2]).
23 -export([validate/1, validate_binding/2,
24 create/2, delete/3, policy_changed/2,
25 add_binding/3, remove_bindings/3, assert_args_equivalence/2]).
28 -record(bucket, {source_number, destination, binding}).
31 {rabbit_exchange_type_consistent_hash_registry,
32 [{description, "exchange type x-consistent-hash: registry"},
33 {mfa, {rabbit_registry, register,
34 [exchange, <<"x-consistent-hash">>, ?MODULE]}},
35 {requires, rabbit_registry},
36 {enables, kernel_ready},
37 {cleanup, {rabbit_registry, unregister,
38 [exchange, <<"x-consistent-hash">>]}}]}).
41 {rabbit_exchange_type_consistent_hash_mnesia,
42 [{description, "exchange type x-consistent-hash: mnesia"},
43 {mfa, {?MODULE, init, []}},
45 {enables, external_infrastructure}]}).
47 -define(TABLE, ?MODULE).
48 -define(PHASH2_RANGE, 134217728). %% 2^27
51 [{description, <<"Consistent Hashing Exchange">>}].
53 serialise_events() -> false.
55 route(#exchange { name = Name,
57 #delivery { message = Msg }) ->
58 %% Yes, we're being exceptionally naughty here, by using ets on an
59 %% mnesia table. However, RabbitMQ-server itself is just as
60 %% naughty, and for good reasons.
62 %% Note that given the nature of this select, it will force mnesia
63 %% to do a linear scan of the entries in the table that have the
64 %% correct exchange name. More sophisticated solutions include,
65 %% for example, having some sort of tree as the value of a single
66 %% mnesia entry for each exchange. However, such values tend to
67 %% end up as relatively deep data structures which cost a lot to
68 %% continually copy to the process heap. Consequently, such
69 %% approaches have not been found to be much faster, if at all.
70 HashOn = rabbit_misc:table_lookup(Args, <<"hash-header">>),
71 H = erlang:phash2(hash(HashOn, Msg), ?PHASH2_RANGE),
72 case ets:select(?TABLE, [{#bucket { source_number = {Name, '$2'},
78 case ets:match_object(?TABLE, #bucket { source_number = {Name, '_'},
80 {[Bucket], _Cont} -> [Bucket#bucket.destination];
83 {Destinations, _Continuation} ->
89 validate_binding(_X, _B) -> ok.
91 create(_Tx, _X) -> ok.
93 delete(transaction, #exchange { name = Name }, _Bs) ->
94 ok = mnesia:write_lock_table(?TABLE),
95 [ok = mnesia:delete_object(?TABLE, R, write) ||
96 R <- mnesia:match_object(
97 ?TABLE, #bucket{source_number = {Name, '_'}, _ = '_'}, write)],
99 delete(_Tx, _X, _Bs) -> ok.
101 policy_changed(_X1, _X2) -> ok.
103 add_binding(transaction, _X,
104 #binding { source = S, destination = D, key = K } = B) ->
105 %% Use :select rather than :match_object so that we can limit the
106 %% number of results and not bother copying results over to this
108 case mnesia:select(?TABLE,
109 [{#bucket { binding = B, _ = '_' }, [], [ok]}],
112 ok = mnesia:write_lock_table(?TABLE),
113 BucketCount = lists:min([list_to_integer(binary_to_list(K)),
115 [ok = mnesia:write(?TABLE,
116 #bucket { source_number = {S, N},
119 write) || N <- find_numbers(S, BucketCount, [])],
124 add_binding(none, _X, _B) ->
127 remove_bindings(transaction, _X, Bindings) ->
128 ok = mnesia:write_lock_table(?TABLE),
129 [ok = mnesia:delete(?TABLE, Key, write) ||
131 Key <- mnesia:select(?TABLE,
132 [{#bucket { source_number = '$1',
134 _ = '_' }, [], ['$1']}],
137 remove_bindings(none, _X, _Bs) ->
140 assert_args_equivalence(X, Args) ->
141 rabbit_exchange:assert_args_equivalence(X, Args).
144 mnesia:create_table(?TABLE, [{record_name, bucket},
145 {attributes, record_info(fields, bucket)},
146 {type, ordered_set}]),
147 mnesia:add_table_copy(?TABLE, node(), ram_copies),
148 mnesia:wait_for_tables([?TABLE], 30000),
151 find_numbers(_Source, 0, Acc) ->
153 find_numbers(Source, N, Acc) ->
154 Number = random:uniform(?PHASH2_RANGE) - 1,
155 case mnesia:read(?TABLE, {Source, Number}, write) of
156 [] -> find_numbers(Source, N-1, [Number | Acc]);
157 [_] -> find_numbers(Source, N, Acc)
160 hash(undefined, #basic_message { routing_keys = Routes }) ->
162 hash({longstr, Header}, #basic_message { content = Content }) ->
163 Headers = rabbit_basic:extract_headers(Content),
165 undefined -> undefined;
166 _ -> rabbit_misc:table_lookup(Headers, Header)