Skip to content

Commit

Permalink
Limit number of duplicate requests to snapshot catchup
Browse files Browse the repository at this point in the history
Summary: If `wa_raft_transport` is slow, then the message queue of `wa_raft_snapshot_catchup` can grow unboundedly. Use an ETS table to avoid this.

Reviewed By: jaher

Differential Revision: D67120896

fbshipit-source-id: d14e71fe980b0e7cca46f60e83291256998642e5
  • Loading branch information
hsun324 authored and facebook-github-bot committed Dec 12, 2024
1 parent 9870562 commit 740ccad
Showing 1 changed file with 25 additions and 1 deletion.
26 changes: 25 additions & 1 deletion src/wa_raft_snapshot_catchup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,15 @@
terminate/2
]).

%% Testing API
-export([
init_tables/0
]).

-define(SCAN_EVERY_MS, 500).

-define(PENDING_KEY(Peer, Table, Partition), {request_snapshot_transport_pending, Peer, Table, Partition}).

-type key() :: {node(), wa_raft:table(), wa_raft:partition()}.
-type snapshot_key() :: {wa_raft:table(), wa_raft:partition(), wa_raft_log:log_pos()}.

Expand Down Expand Up @@ -75,14 +82,28 @@ current_snapshot_transports() ->

-spec request_snapshot_transport(App :: atom(), Peer :: node(), Table :: wa_raft:table(), Partition :: wa_raft:partition()) -> ok.
request_snapshot_transport(App, Peer, Table, Partition) ->
gen_server:cast(?MODULE, {request_snapshot_transport, App, Peer, Table, Partition}).
try
% Check ETS to avoid putting duplicate requests into the message queue.
ets:insert_new(?MODULE, {?PENDING_KEY(Peer, Table, Partition)}) andalso
gen_server:cast(?MODULE, {request_snapshot_transport, App, Peer, Table, Partition}),
ok
catch
error:badarg ->
ok
end.

-spec init(Args :: term()) -> {ok, #state{}}.
init([]) ->
process_flag(trap_exit, true),
init_tables(),
schedule_scan(),
{ok, #state{}}.

-spec init_tables() -> ok.
init_tables() ->
?MODULE = ets:new(?MODULE, [set, public, named_table]),
ok.

-spec handle_call(Request :: term(), From :: gen_server:from(), State :: #state{}) -> {noreply, #state{}} | {reply, term(), #state{}}.
handle_call(current_snapshot_transports, _From, #state{transports = Transports} = State) ->
{reply, [ID || #transport{id = ID} <- maps:values(Transports)], State};
Expand All @@ -92,6 +113,9 @@ handle_call(Request, From, #state{} = State) ->

-spec handle_cast({request_snapshot_transport, atom(), node(), wa_raft:table(), wa_raft:partition()}, State :: #state{}) -> {noreply, #state{}}.
handle_cast({request_snapshot_transport, App, Peer, Table, Partition}, #state{transports = Transports, snapshots = Snapshots, overload_backoffs = OverloadBackoffs, retry_backoffs = RetryBackoffs} = State) ->
% Just immediately remove the pending key from the ETS. Doing this here is simpler
% but permits a bounded number of extra requests to remain in the queue.
ets:delete(?MODULE, ?PENDING_KEY(Peer, Table, Partition)),
Now = erlang:monotonic_time(millisecond),
Key = {Peer, Table, Partition},
Exists = maps:is_key(Key, Transports),
Expand Down

0 comments on commit 740ccad

Please sign in to comment.