From 5983854fb28e64d2321499a3d16458d15d0b3b64 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Fri, 26 Jan 2024 16:03:58 -0500 Subject: [PATCH 01/17] Add counters for checkpoints These definitions mirror the snapshot counter definitions. --- src/ra.hrl | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/src/ra.hrl b/src/ra.hrl index 246283d0..8d8d16be 100644 --- a/src/ra.hrl +++ b/src/ra.hrl @@ -254,6 +254,10 @@ {snapshot_bytes_written, ?C_RA_LOG_SNAPSHOT_BYTES_WRITTEN, counter, "Number of snapshot bytes written (not installed)"}, {open_segments, ?C_RA_LOG_OPEN_SEGMENTS, gauge, "Number of open segments"}, + {checkpoints_written, ?C_RA_LOG_CHECKPOINTS_WRITTEN, counter, + "Total number of checkpoints written"}, + {checkpoint_bytes_written, ?C_RA_LOG_CHECKPOINT_BYTES_WRITTEN, counter, + "Number of checkpoint bytes written"}, {reserved_1, ?C_RA_LOG_RESERVED, counter, "Reserved counter"} ]). -define(C_RA_LOG_WRITE_OPS, 1). @@ -268,7 +272,9 @@ -define(C_RA_LOG_SNAPSHOTS_INSTALLED, 10). -define(C_RA_LOG_SNAPSHOT_BYTES_WRITTEN, 11). -define(C_RA_LOG_OPEN_SEGMENTS, 12). --define(C_RA_LOG_RESERVED, 13). +-define(C_RA_LOG_CHECKPOINTS_WRITTEN, 13). +-define(C_RA_LOG_CHECKPOINT_BYTES_WRITTEN, 14). +-define(C_RA_LOG_RESERVED, 15). -define(C_RA_SRV_AER_RECEIVED_FOLLOWER, ?C_RA_LOG_RESERVED + 1). -define(C_RA_SRV_AER_REPLIES_SUCCESS, ?C_RA_LOG_RESERVED + 2). @@ -290,7 +296,8 @@ -define(C_RA_SRV_TERM_AND_VOTED_FOR_UPDATES, ?C_RA_LOG_RESERVED + 18). -define(C_RA_SRV_LOCAL_QUERIES, ?C_RA_LOG_RESERVED + 19). -define(C_RA_SRV_INVALID_REPLY_MODE_COMMANDS, ?C_RA_LOG_RESERVED + 20). --define(C_RA_SRV_RESERVED, ?C_RA_LOG_RESERVED + 21). +-define(C_RA_SRV_CHECKPOINTS, ?C_RA_LOG_RESERVED + 21). +-define(C_RA_SRV_RESERVED, ?C_RA_LOG_RESERVED + 22). -define(RA_SRV_COUNTER_FIELDS, @@ -335,6 +342,8 @@ "Total number of local queries"}, {invalid_reply_mode_commands, ?C_RA_SRV_INVALID_REPLY_MODE_COMMANDS, counter, "Total number of commands received with an invalid reply-mode"}, + {checkpoints, ?C_RA_SRV_CHECKPOINTS, counter, + "The number of checkpoint effects executed"}, {reserved_2, ?C_RA_SRV_RESERVED, counter, "Reserved counter"} ]). @@ -345,6 +354,7 @@ -define(C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, ?C_RA_SRV_RESERVED + 5). -define(C_RA_SVR_METRIC_COMMIT_LATENCY, ?C_RA_SRV_RESERVED + 6). -define(C_RA_SVR_METRIC_TERM, ?C_RA_SRV_RESERVED + 7). +-define(C_RA_SVR_METRIC_CHECKPOINT_INDEX, ?C_RA_SRV_RESERVED + 8). -define(RA_SRV_METRICS_COUNTER_FIELDS, [ @@ -360,7 +370,9 @@ "The last fully written and fsynced index of the log."}, {commit_latency, ?C_RA_SVR_METRIC_COMMIT_LATENCY, gauge, "Approximate time taken from an entry being written to the log until it is committed."}, - {term, ?C_RA_SVR_METRIC_TERM, counter, "The current term."} + {term, ?C_RA_SVR_METRIC_TERM, counter, "The current term."}, + {checkpoint_index, ?C_RA_SVR_METRIC_CHECKPOINT_INDEX, counter, + "The current checkpoint index."} ]). -define(RA_COUNTER_FIELDS, From 180ab36dcf7eec0b82eb325c9ec5b8e98f68b87a Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Fri, 26 Jan 2024 16:04:16 -0500 Subject: [PATCH 02/17] Stub an effect for creating a checkpoint --- src/ra_log.erl | 9 +++++++++ src/ra_machine.erl | 1 + src/ra_server.erl | 12 ++++++++++++ src/ra_server_proc.erl | 7 +++++++ 4 files changed, 29 insertions(+) diff --git a/src/ra_log.erl b/src/ra_log.erl index da712c4c..bd268f86 100644 --- a/src/ra_log.erl +++ b/src/ra_log.erl @@ -31,6 +31,7 @@ recover_snapshot/1, snapshot_index_term/1, update_release_cursor/5, + checkpoint/5, needs_cache_flush/1, can_write/1, @@ -625,6 +626,14 @@ update_release_cursor(Idx, Cluster, MacVersion, MacState, {State, []} end. +-spec checkpoint(Idx :: ra_index(), Cluster :: ra_cluster(), + MacVersion :: ra_machine:version(), + MacState :: term(), State :: state()) -> + {state(), effects()}. +checkpoint(Idx, Cluster, MacVersion, MacState, + #?MODULE{checkpoint_state = CheckpointState} = State) -> + todo. + -spec flush_cache(state()) -> state(). flush_cache(#?MODULE{cache = Cache} = State) -> State#?MODULE{cache = ra_log_cache:flush(Cache)}. diff --git a/src/ra_machine.erl b/src/ra_machine.erl index ba54092a..255a2928 100644 --- a/src/ra_machine.erl +++ b/src/ra_machine.erl @@ -135,6 +135,7 @@ {log, [ra_index()], fun(([user_command()]) -> effects())} | {log, [ra_index()], fun(([user_command()]) -> effects()), {local, node()}} | {release_cursor, ra_index(), state()} | + {checkpoint, ra_index(), state()} | {aux, term()} | garbage_collection. diff --git a/src/ra_server.erl b/src/ra_server.erl index 055a9730..0bffb3c3 100644 --- a/src/ra_server.erl +++ b/src/ra_server.erl @@ -46,6 +46,7 @@ % TODO: hide behind a handle_leader make_rpcs/1, update_release_cursor/3, + checkpoint/3, persist_last_applied/1, update_peer/3, register_external_log_reader/2, @@ -1643,6 +1644,8 @@ evaluate_commit_index_follower(State, Effects) -> filter_follower_effects(Effects) -> lists:foldr(fun ({release_cursor, _, _} = C, Acc) -> [C | Acc]; + ({checkpoint, _, _} = C, Acc) -> + [C | Acc]; ({record_leader_msg, _} = C, Acc) -> [C | Acc]; ({aux, _} = C, Acc) -> @@ -1846,6 +1849,15 @@ update_release_cursor(Index, MacState, MacState, Log0), {State#{log => Log}, Effects}. +-spec checkpoint(ra_index(), term(), ra_server_state()) -> + {ra_server_state(), effects()}. +checkpoint(Index, MacState, + State = #{log := Log0, cluster := Cluster}) -> + MacVersion = index_machine_version(Index, State), + {Log, Effects} = ra_log:checkpoint(Index, Cluster, + MacVersion, MacState, Log0), + {State#{log => Log}, Effects}. + % Persist last_applied - as there is an inherent race we cannot % always guarantee that side effects won't be re-issued when a % follower that has seen an entry but not the commit_index diff --git a/src/ra_server_proc.erl b/src/ra_server_proc.erl index 81f4108f..213687db 100644 --- a/src/ra_server_proc.erl +++ b/src/ra_server_proc.erl @@ -1329,6 +1329,13 @@ handle_effect(RaftState, {release_cursor, Index, MacState}, EvtType, ServerState0), State1 = State0#state{server_state = ServerState}, handle_effects(RaftState, Effects, EvtType, State1, Actions0); +handle_effect(RaftState, {checkpoint, Index, MacState}, EvtType, + #state{server_state = ServerState0} = State0, Actions0) -> + incr_counter(State0#state.conf, ?C_RA_SRV_CHECKPOINTS, 1), + {ServerState, Effects} = ra_server:checkpoint(Index, MacState, + ServerState0), + State1 = State0#state{server_state = ServerState}, + handle_effects(RaftState, Effects, EvtType, State1, Actions0); handle_effect(_, garbage_collection, _EvtType, State, Actions) -> true = erlang:garbage_collect(), incr_counter(State#state.conf, ?C_RA_SRV_GCS, 1), From fb8813ad874758fbb8a2fd08d0397eff34f16689 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Fri, 26 Jan 2024 16:04:20 -0500 Subject: [PATCH 03/17] Store checkpoints in `#ra_snapshot{}` Snapshot state already holds nearly everything necessary for checkpoints. We add a parameter and a field to the record for the checkpoint directory and adjust some of the `ra_snapshot` API to differentiate between snapshots and checkpoints. An alternative approach to this would be to have a separate `#ra_snapshot{}` state for snapshots and checkpoints. However we would need to change `#ra_snapshot{}` somewhat substantially to support multiple snapshots, even though there is only ever one useful snapshot. It also makes it slightly harder to ensure that there is only one checkpoint or snapshot being written at a given time, whereas this strategy can continue to use the `pending` field. --- src/ra_log.erl | 9 +++- src/ra_snapshot.erl | 94 +++++++++++++++++++++++++++----------- test/ra_log_2_SUITE.erl | 4 +- test/ra_snapshot_SUITE.erl | 85 ++++++++++++++++++++++++---------- 4 files changed, 138 insertions(+), 54 deletions(-) diff --git a/src/ra_log.erl b/src/ra_log.erl index bd268f86..d85a9c96 100644 --- a/src/ra_log.erl +++ b/src/ra_log.erl @@ -133,7 +133,9 @@ pre_init(#{uid := UId, Dir = server_data_dir(DataDir, UId), SnapModule = maps:get(snapshot_module, Conf, ?DEFAULT_SNAPSHOT_MODULE), SnapshotsDir = filename:join(Dir, "snapshots"), - _ = ra_snapshot:init(UId, SnapModule, SnapshotsDir, undefined), + CheckpointsDir = filename:join(Dir, "checkpoints"), + _ = ra_snapshot:init(UId, SnapModule, SnapshotsDir, + CheckpointsDir, undefined), ok. -spec init(ra_log_init_args()) -> state(). @@ -150,14 +152,17 @@ init(#{uid := UId, ResendWindow = maps:get(resend_window, Conf, ?DEFAULT_RESEND_WINDOW_SEC), SnapInterval = maps:get(snapshot_interval, Conf, ?SNAPSHOT_INTERVAL), SnapshotsDir = filename:join(Dir, "snapshots"), + CheckpointsDir = filename:join(Dir, "checkpoints"), Counter = maps:get(counter, Conf, undefined), %% ensure directories are there ok = ra_lib:make_dir(Dir), ok = ra_lib:make_dir(SnapshotsDir), + ok = ra_lib:make_dir(CheckpointsDir), % initialise metrics for this server true = ets:insert(ra_log_metrics, {UId, 0, 0, 0, 0}), - SnapshotState = ra_snapshot:init(UId, SnapModule, SnapshotsDir, Counter), + SnapshotState = ra_snapshot:init(UId, SnapModule, SnapshotsDir, + CheckpointsDir, Counter), {SnapIdx, SnapTerm} = case ra_snapshot:current(SnapshotState) of undefined -> {-1, -1}; Curr -> Curr diff --git a/src/ra_snapshot.erl b/src/ra_snapshot.erl index a6a01457..40189b3a 100644 --- a/src/ra_snapshot.erl +++ b/src/ra_snapshot.erl @@ -20,8 +20,7 @@ read_chunk/3, delete/2, - init/3, - init/4, + init/5, init_ets/0, current/1, pending/1, @@ -44,7 +43,18 @@ -type effect() :: {monitor, process, snapshot_writer, pid()}. --export_type([meta/0, file_err/0, effect/0, chunk_flag/0]). +-type kind() :: snapshot | checkpoint. + +-type checkpoint() :: ra_idxterm(). + +-export_type([ + meta/0, + file_err/0, + effect/0, + chunk_flag/0, + kind/0, + checkpoint/0 + ]). -record(accept, {%% the next expected chunk next = 1 :: non_neg_integer(), @@ -55,14 +65,18 @@ {uid :: ra_uid(), counter :: undefined | counters:counters_ref(), module :: module(), - %% the snapshot directory %% typically /snapshots %% snapshot subdirs are store below %% this as /snapshots/Term_Index - directory :: file:filename(), - pending :: option({pid(), ra_idxterm()}), + snapshot_directory :: file:filename(), + %% /checkpoints + %% like snapshots, these are also stored in subdirs + %% as /checkpoints/Term_Index + checkpoint_directory :: file:filename(), + pending :: option({pid(), ra_idxterm(), kind()}), accepting :: option(#accept{}), - current :: option(ra_idxterm())}). + current :: option(ra_idxterm()), + checkpoints = [] :: list(checkpoint())}). -define(ETSTBL, ra_log_snapshot_state). @@ -139,19 +153,21 @@ -callback context() -> map(). --spec init(ra_uid(), module(), file:filename()) -> - state(). -init(UId, Mod, File) -> - init(UId, Mod, File, undefined). - --spec init(ra_uid(), module(), file:filename(), +-spec init(ra_uid(), module(), file:filename(), file:filename(), undefined | counters:counters_ref()) -> state(). -init(UId, Module, SnapshotsDir, Counter) -> +init(UId, Module, SnapshotsDir, CheckpointDir, Counter) -> State = #?MODULE{uid = UId, counter = Counter, module = Module, - directory = SnapshotsDir}, + snapshot_directory = SnapshotsDir, + checkpoint_directory = CheckpointDir}, + State1 = find_snapshots(State), + find_checkpoints(State1). + +find_snapshots(#?MODULE{uid = UId, + module = Module, + snapshot_directory = SnapshotsDir} = State) -> true = ra_lib:is_dir(SnapshotsDir), {ok, Snaps0} = prim_file:list_dir(SnapshotsDir), Snaps = lists:reverse(lists:sort(Snaps0)), @@ -186,6 +202,30 @@ pick_first_valid(UId, Mod, Dir, [S | Rem]) -> pick_first_valid(UId, Mod, Dir, Rem) end. +find_checkpoints(#?MODULE{uid = UId, + module = Module, + checkpoint_directory = CheckpointDir} = State) -> + true = ra_lib:is_dir(CheckpointDir), + {ok, CPFiles0} = prim_file:list_dir(CheckpointDir), + CPFiles = lists:reverse(lists:sort(CPFiles0)), + Checkpoints = + lists:filtermap( + fun(File) -> + CP = filename:join(CheckpointDir, File), + case Module:validate(CP) of + ok -> + {ok, #{index := Idx, term := Term}} = + Module:read_meta(CP), + {true, {Idx, Term}}; + Err -> + ?INFO("ra_snapshot: ~ts: removing checkpoint ~s as " + "did not validate. Err: ~w", + [UId, CP, Err]), + ra_lib:recursive_delete(CP), + false + end + end, CPFiles), + State#?MODULE{checkpoints = Checkpoints}. -spec init_ets() -> ok. init_ets() -> @@ -200,7 +240,7 @@ init_ets() -> -spec current(state()) -> option(ra_idxterm()). current(#?MODULE{current = Current}) -> Current. --spec pending(state()) -> option({pid(), ra_idxterm()}). +-spec pending(state()) -> option({pid(), ra_idxterm(), kind()}). pending(#?MODULE{pending = Pending}) -> Pending. @@ -211,7 +251,7 @@ accepting(#?MODULE{accepting = #accept{idxterm = Accepting}}) -> Accepting. -spec directory(state()) -> file:filename(). -directory(#?MODULE{directory = Dir}) -> Dir. +directory(#?MODULE{snapshot_directory = Dir}) -> Dir. -spec last_index_for(ra_uid()) -> option(ra_index()). last_index_for(UId) -> @@ -225,7 +265,7 @@ last_index_for(UId) -> begin_snapshot(#{index := Idx, term := Term} = Meta, MacRef, #?MODULE{module = Mod, counter = Counter, - directory = Dir} = State) -> + snapshot_directory = Dir} = State) -> %% create directory for this snapshot SnapDir = make_snapshot_dir(Dir, Idx, Term), %% call prepare then write_snapshot @@ -259,7 +299,7 @@ begin_snapshot(#{index := Idx, term := Term} = Meta, MacRef, complete_snapshot({Idx, _} = IdxTerm, #?MODULE{uid = UId, module = _Mod, - directory = _Dir} = State) -> + snapshot_directory = _Dir} = State) -> true = ets:insert(?ETSTBL, {UId, Idx}), State#?MODULE{pending = undefined, current = IdxTerm}. @@ -268,7 +308,7 @@ complete_snapshot({Idx, _} = IdxTerm, {ok, state()}. begin_accept(#{index := Idx, term := Term} = Meta, #?MODULE{module = Mod, - directory = Dir} = State) -> + snapshot_directory = Dir} = State) -> SnapDir = make_snapshot_dir(Dir, Idx, Term), ok = ra_lib:make_dir(SnapDir), {ok, AcceptState} = Mod:begin_accept(SnapDir, Meta), @@ -280,7 +320,7 @@ begin_accept(#{index := Idx, term := Term} = Meta, accept_chunk(Chunk, Num, last, #?MODULE{uid = UId, module = Mod, - directory = Dir, + snapshot_directory = Dir, current = Current, accepting = #accept{next = Num, idxterm = {Idx, _} = IdxTerm, @@ -314,7 +354,7 @@ accept_chunk(_Chunk, Num, _ChunkFlag, abort_accept(#?MODULE{accepting = undefined} = State) -> State; abort_accept(#?MODULE{accepting = #accept{idxterm = {Idx, Term}}, - directory = Dir} = State) -> + snapshot_directory = Dir} = State) -> ok = delete(Dir, {Idx, Term}), State#?MODULE{accepting = undefined}. @@ -342,7 +382,7 @@ handle_down(_Pid, noproc, State) -> %% finished State; handle_down(Pid, _Info, - #?MODULE{directory = Dir, + #?MODULE{snapshot_directory = Dir, pending = {Pid, IdxTerm}} = State) -> %% delete the pending snapshot directory ok = delete(Dir, IdxTerm), @@ -359,7 +399,7 @@ delete(Dir, {Idx, Term}) -> {ok, Meta :: meta(), ReadState} | {error, term()} when ReadState :: term(). begin_read(#?MODULE{module = Mod, - directory = Dir, + snapshot_directory = Dir, current = {Idx, Term}}, Context) when is_map(Context) -> Location = make_snapshot_dir(Dir, Idx, Term), @@ -371,7 +411,7 @@ begin_read(#?MODULE{module = Mod, {ok, Data :: term(), {next, ReadState} | last} | {error, term()} when ReadState :: term(). read_chunk(ReadState, ChunkSizeBytes, #?MODULE{module = Mod, - directory = Dir, + snapshot_directory = Dir, current = {Idx, Term}}) -> %% TODO: do we need to generate location for every chunk? Location = make_snapshot_dir(Dir, Idx, Term), @@ -384,7 +424,7 @@ read_chunk(ReadState, ChunkSizeBytes, #?MODULE{module = Mod, recover(#?MODULE{current = undefined}) -> {error, no_current_snapshot}; recover(#?MODULE{module = Mod, - directory = Dir, + snapshot_directory = Dir, current = {Idx, Term}}) -> SnapDir = make_snapshot_dir(Dir, Idx, Term), Mod:recover(SnapDir). @@ -401,7 +441,7 @@ read_meta(Module, Location) -> -spec current_snapshot_dir(state()) -> option(file:filename()). -current_snapshot_dir(#?MODULE{directory = Dir, +current_snapshot_dir(#?MODULE{snapshot_directory = Dir, current = {Idx, Term}}) -> make_snapshot_dir(Dir, Idx, Term); current_snapshot_dir(_) -> diff --git a/test/ra_log_2_SUITE.erl b/test/ra_log_2_SUITE.erl index d25eb4b1..cc174003 100644 --- a/test/ra_log_2_SUITE.erl +++ b/test/ra_log_2_SUITE.erl @@ -1363,9 +1363,11 @@ meta(Idx, Term, Cluster) -> create_snapshot_chunk(Config, #{index := Idx} = Meta, Context) -> OthDir = filename:join(?config(priv_dir, Config), "snapshot_installation"), + CPDir = filename:join(?config(priv_dir, Config), "checkpoints"), ok = ra_lib:make_dir(OthDir), + ok = ra_lib:make_dir(CPDir), Sn0 = ra_snapshot:init(<<"someotheruid_adsfasdf">>, ra_log_snapshot, - OthDir), + OthDir, CPDir, undefined), MacRef = <<"9">>, {Sn1, _} = ra_snapshot:begin_snapshot(Meta, MacRef, Sn0), Sn2 = diff --git a/test/ra_snapshot_SUITE.erl b/test/ra_snapshot_SUITE.erl index 1298c618..8d66018e 100644 --- a/test/ra_snapshot_SUITE.erl +++ b/test/ra_snapshot_SUITE.erl @@ -62,9 +62,13 @@ init_per_testcase(TestCase, Config) -> ok = ra_snapshot:init_ets(), SnapDir = filename:join([?config(priv_dir, Config), TestCase, "snapshots"]), + CheckpointDir = filename:join([?config(priv_dir, Config), + TestCase, "checkpoints"]), ok = ra_lib:make_dir(SnapDir), + ok = ra_lib:make_dir(CheckpointDir), [{uid, ra_lib:to_binary(TestCase)}, - {snap_dir, SnapDir} | Config]. + {snap_dir, SnapDir}, + {checkpoint_dir, CheckpointDir} | Config]. end_per_testcase(_TestCase, _Config) -> ok. @@ -75,7 +79,10 @@ end_per_testcase(_TestCase, _Config) -> init_empty(Config) -> UId = ?config(uid, Config), - State = ra_snapshot:init(UId, ?MODULE, ?config(snap_dir, Config), undefined), + State = ra_snapshot:init(UId, ?MODULE, + ?config(snap_dir, Config), + ?config(checkpoint_dir, Config), + undefined), %% no pending, no current undefined = ra_snapshot:current(State), undefined = ra_snapshot:pending(State), @@ -86,13 +93,15 @@ init_empty(Config) -> take_snapshot(Config) -> UId = ?config(uid, Config), State0 = ra_snapshot:init(UId, ra_log_snapshot, - ?config(snap_dir, Config), undefined), + ?config(snap_dir, Config), + ?config(checkpoint_dir, Config), + undefined), Meta = meta(55, 2, [node()]), MacRef = ?FUNCTION_NAME, {State1, [{monitor, process, snapshot_writer, Pid}]} = ra_snapshot:begin_snapshot(Meta, MacRef, State0), undefined = ra_snapshot:current(State1), - {Pid, {55, 2}} = ra_snapshot:pending(State1), + {Pid, {55, 2}, snapshot} = ra_snapshot:pending(State1), receive {ra_log_event, {snapshot_written, {55, 2} = IdxTerm}} -> State = ra_snapshot:complete_snapshot(IdxTerm, State1), @@ -108,13 +117,14 @@ take_snapshot(Config) -> take_snapshot_crash(Config) -> UId = ?config(uid, Config), SnapDir = ?config(snap_dir, Config), - State0 = ra_snapshot:init(UId, ra_log_snapshot, SnapDir, undefined), + CPDir = ?config(checkpoint_dir, Config), + State0 = ra_snapshot:init(UId, ra_log_snapshot, SnapDir, CPDir, undefined), Meta = meta(55, 2, [node()]), MacRef = ?FUNCTION_NAME, {State1, [{monitor, process, snapshot_writer, Pid}]} = ra_snapshot:begin_snapshot(Meta, MacRef, State0), undefined = ra_snapshot:current(State1), - {Pid, {55, 2}} = ra_snapshot:pending(State1), + {Pid, {55, 2}, snapshot} = ra_snapshot:pending(State1), receive {ra_log_event, _} -> %% just pretend the snapshot event didn't happen @@ -138,7 +148,9 @@ take_snapshot_crash(Config) -> init_recover(Config) -> UId = ?config(uid, Config), State0 = ra_snapshot:init(UId, ra_log_snapshot, - ?config(snap_dir, Config), undefined), + ?config(snap_dir, Config), + ?config(checkpoint_dir, Config), + undefined), Meta = meta(55, 2, [node()]), {State1, [{monitor, process, snapshot_writer, _}]} = ra_snapshot:begin_snapshot(Meta, ?FUNCTION_NAME, State0), @@ -152,7 +164,9 @@ init_recover(Config) -> %% open a new snapshot state to simulate a restart Recover = ra_snapshot:init(UId, ra_log_snapshot, - ?config(snap_dir, Config), undefined), + ?config(snap_dir, Config), + ?config(checkpoint_dir, Config), + undefined), %% ensure last snapshot is recovered %% it also needs to be validated as could have crashed mid write undefined = ra_snapshot:pending(Recover), @@ -166,7 +180,9 @@ init_recover(Config) -> init_recover_voter_status(Config) -> UId = ?config(uid, Config), State0 = ra_snapshot:init(UId, ra_log_snapshot, - ?config(snap_dir, Config), undefined), + ?config(snap_dir, Config), + ?config(checkpoint_dir, Config), + undefined), Meta = meta(55, 2, #{node() => #{voter_status => test}}), {State1, [{monitor, process, snapshot_writer, _}]} = ra_snapshot:begin_snapshot(Meta, ?FUNCTION_NAME, State0), @@ -180,7 +196,9 @@ init_recover_voter_status(Config) -> %% open a new snapshot state to simulate a restart Recover = ra_snapshot:init(UId, ra_log_snapshot, - ?config(snap_dir, Config), undefined), + ?config(snap_dir, Config), + ?config(checkpoint_dir, Config), + undefined), %% ensure last snapshot is recovered %% it also needs to be validated as could have crashed mid write undefined = ra_snapshot:pending(Recover), @@ -194,7 +212,9 @@ init_recover_voter_status(Config) -> init_multi(Config) -> UId = ?config(uid, Config), State0 = ra_snapshot:init(UId, ra_log_snapshot, - ?config(snap_dir, Config), undefined), + ?config(snap_dir, Config), + ?config(checkpoint_dir, Config), + undefined), Meta1 = meta(55, 2, [node()]), Meta2 = meta(165, 2, [node()]), {State1, _} = ra_snapshot:begin_snapshot(Meta1, ?FUNCTION_NAME, State0), @@ -203,7 +223,7 @@ init_multi(Config) -> State2 = ra_snapshot:complete_snapshot(IdxTerm, State1), {State3, _} = ra_snapshot:begin_snapshot(Meta2, ?FUNCTION_NAME, State2), - {_, {165, 2}} = ra_snapshot:pending(State3), + {_, {165, 2}, snapshot} = ra_snapshot:pending(State3), {55, 2} = ra_snapshot:current(State3), 55 = ra_snapshot:last_index_for(UId), receive @@ -219,7 +239,9 @@ init_multi(Config) -> %% open a new snapshot state to simulate a restart Recover = ra_snapshot:init(UId, ra_log_snapshot, - ?config(snap_dir, Config), undefined), + ?config(snap_dir, Config), + ?config(checkpoint_dir, Config), + undefined), %% ensure last snapshot is recovered %% it also needs to be validated as could have crashed mid write undefined = ra_snapshot:pending(Recover), @@ -233,7 +255,8 @@ init_multi(Config) -> init_recover_multi_corrupt(Config) -> UId = ?config(uid, Config), SnapsDir = ?config(snap_dir, Config), - State0 = ra_snapshot:init(UId, ra_log_snapshot, SnapsDir, undefined), + CPDir = ?config(checkpoint_dir, Config), + State0 = ra_snapshot:init(UId, ra_log_snapshot, SnapsDir, CPDir, undefined), Meta1 = meta(55, 2, [node()]), Meta2 = meta(165, 2, [node()]), {State1, _} = ra_snapshot:begin_snapshot(Meta1, ?FUNCTION_NAME, State0), @@ -242,7 +265,7 @@ init_recover_multi_corrupt(Config) -> State2 = ra_snapshot:complete_snapshot(IdxTerm, State1), {State3, _} = ra_snapshot:begin_snapshot(Meta2, ?FUNCTION_NAME, State2), - {_, {165, 2}} = ra_snapshot:pending(State3), + {_, {165, 2}, snapshot} = ra_snapshot:pending(State3), {55, 2} = ra_snapshot:current(State3), 55 = ra_snapshot:last_index_for(UId), receive @@ -262,7 +285,9 @@ init_recover_multi_corrupt(Config) -> %% open a new snapshot state to simulate a restart Recover = ra_snapshot:init(UId, ra_log_snapshot, - ?config(snap_dir, Config), undefined), + ?config(snap_dir, Config), + ?config(checkpoint_dir, Config), + undefined), %% ensure last snapshot is recovered %% it also needs to be validated as could have crashed mid write undefined = ra_snapshot:pending(Recover), @@ -280,8 +305,10 @@ init_recover_corrupt(Config) -> UId = ?config(uid, Config), Meta = meta(55, 2, [node()]), SnapsDir = ?config(snap_dir, Config), - State0 = ra_snapshot:init(UId, ra_log_snapshot, SnapsDir, undefined), - {State1, _} = ra_snapshot:begin_snapshot(Meta, ?FUNCTION_NAME, State0), + CPDir = ?config(checkpoint_dir, Config), + State0 = ra_snapshot:init(UId, ra_log_snapshot, SnapsDir, CPDir, undefined), + {State1, _} = ra_snapshot:begin_snapshot(Meta, ?FUNCTION_NAME, snapshot, + State0), _ = receive {ra_log_event, {snapshot_written, IdxTerm}} -> ra_snapshot:complete_snapshot(IdxTerm, State1) @@ -298,7 +325,9 @@ init_recover_corrupt(Config) -> ets:delete_all_objects(ra_log_snapshot_state), %% open a new snapshot state to simulate a restart Recover = ra_snapshot:init(UId, ra_log_snapshot, - ?config(snap_dir, Config), undefined), + ?config(snap_dir, Config), + ?config(checkpoint_dir, Config), + undefined), %% ensure the corrupt snapshot isn't recovered undefined = ra_snapshot:pending(Recover), undefined = ra_snapshot:current(Recover), @@ -310,7 +339,9 @@ init_recover_corrupt(Config) -> read_snapshot(Config) -> UId = ?config(uid, Config), State0 = ra_snapshot:init(UId, ra_log_snapshot, - ?config(snap_dir, Config), undefined), + ?config(snap_dir, Config), + ?config(checkpoint_dir, Config), + undefined), Meta = meta(55, 2, [node()]), MacRef = crypto:strong_rand_bytes(1024 * 4), {State1, _} = @@ -341,7 +372,9 @@ read_all_chunks(ChunkState, State, Size, Acc) -> accept_snapshot(Config) -> UId = ?config(uid, Config), State0 = ra_snapshot:init(UId, ra_log_snapshot, - ?config(snap_dir, Config), undefined), + ?config(snap_dir, Config), + ?config(checkpoint_dir, Config), + undefined), Meta = meta(55, 2, [node()]), MetaBin = term_to_binary(Meta), MacRef = crypto:strong_rand_bytes(1024 * 4), @@ -374,7 +407,9 @@ accept_snapshot(Config) -> abort_accept(Config) -> UId = ?config(uid, Config), State0 = ra_snapshot:init(UId, ra_log_snapshot, - ?config(snap_dir, Config), undefined), + ?config(snap_dir, Config), + ?config(checkpoint_dir, Config), + undefined), Meta = meta(55, 2, [node()]), MacRef = crypto:strong_rand_bytes(1024 * 4), MacBin = term_to_binary(MacRef), @@ -400,7 +435,8 @@ abort_accept(Config) -> accept_receives_snapshot_written_with_lower_index(Config) -> UId = ?config(uid, Config), SnapDir = ?config(snap_dir, Config), - State0 = ra_snapshot:init(UId, ra_log_snapshot, SnapDir, undefined), + CPDir = ?config(checkpoint_dir, Config), + State0 = ra_snapshot:init(UId, ra_log_snapshot, SnapDir, CPDir, undefined), MetaLocal = meta(55, 2, [node()]), MetaRemote = meta(165, 2, [node()]), MetaRemoteBin = term_to_binary(MetaRemote), @@ -440,7 +476,8 @@ accept_receives_snapshot_written_with_lower_index(Config) -> accept_receives_snapshot_written_with_higher_index(Config) -> UId = ?config(uid, Config), SnapDir = ?config(snap_dir, Config), - State0 = ra_snapshot:init(UId, ra_log_snapshot, SnapDir, undefined), + CPDir = ?config(checkpoint_dir, Config), + State0 = ra_snapshot:init(UId, ra_log_snapshot, SnapDir, CPDir, undefined), MetaRemote = meta(55, 2, [node()]), MetaLocal = meta(165, 2, [node()]), %% begin a local snapshot From 39921b8e4f68894f873f638097a810eac16abec6 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Fri, 26 Jan 2024 16:04:23 -0500 Subject: [PATCH 04/17] Remove outdated checkpoints on `ra_snapshot:init/5` We delete any checkpoints older than the snapshot index. These will never be useful: it would be faster to recover from a more up-to-date snapshot. So we can delete the directories and omit them from the checkpoint list. --- src/ra_snapshot.erl | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/src/ra_snapshot.erl b/src/ra_snapshot.erl index 40189b3a..6b0f3cd7 100644 --- a/src/ra_snapshot.erl +++ b/src/ra_snapshot.erl @@ -204,8 +204,15 @@ pick_first_valid(UId, Mod, Dir, [S | Rem]) -> find_checkpoints(#?MODULE{uid = UId, module = Module, + current = Current, checkpoint_directory = CheckpointDir} = State) -> true = ra_lib:is_dir(CheckpointDir), + CurrentIdx = case Current of + undefined -> + -1; + {I, _} -> + I + end, {ok, CPFiles0} = prim_file:list_dir(CheckpointDir), CPFiles = lists:reverse(lists:sort(CPFiles0)), Checkpoints = @@ -216,7 +223,17 @@ find_checkpoints(#?MODULE{uid = UId, ok -> {ok, #{index := Idx, term := Term}} = Module:read_meta(CP), - {true, {Idx, Term}}; + case Idx > CurrentIdx of + true -> + {true, {Idx, Term}}; + false -> + ?INFO("ra_snapshot: ~ts: removing " + "checkpoint ~s as was older than the " + "current snapshot.", + [UId, CP]), + delete(CheckpointDir, {Idx, Term}), + false + end; Err -> ?INFO("ra_snapshot: ~ts: removing checkpoint ~s as " "did not validate. Err: ~w", From 48d30e70aa3e569a24857d5dc3431477689608db Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Fri, 26 Jan 2024 16:04:25 -0500 Subject: [PATCH 05/17] Add `ra_snapshot:latest_checkpoint/1' This is the same as 'ra_snapshot:current/1' but for checkpoints rather than snapshots. We will use this to recover from the most recent checkpoint if it exists and is newer than the snapshot. --- src/ra_snapshot.erl | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/ra_snapshot.erl b/src/ra_snapshot.erl index 6b0f3cd7..1248991f 100644 --- a/src/ra_snapshot.erl +++ b/src/ra_snapshot.erl @@ -38,7 +38,9 @@ context/2, handle_down/3, - current_snapshot_dir/1 + current_snapshot_dir/1, + + latest_checkpoint/1 ]). -type effect() :: {monitor, process, snapshot_writer, pid()}. @@ -257,6 +259,10 @@ init_ets() -> -spec current(state()) -> option(ra_idxterm()). current(#?MODULE{current = Current}) -> Current. +-spec latest_checkpoint(state()) -> option(checkpoint()). +latest_checkpoint(#?MODULE{checkpoints = [Current | _]}) -> Current; +latest_checkpoint(#?MODULE{checkpoints = _}) -> undefined. + -spec pending(state()) -> option({pid(), ra_idxterm(), kind()}). pending(#?MODULE{pending = Pending}) -> Pending. From cdc22f6192a6bc58bcba6e96956e28c4fa25b603 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Fri, 26 Jan 2024 16:04:27 -0500 Subject: [PATCH 06/17] Take checkpoints with the `{checkpoint,Idx,MacState}` effect This updates the snapshotting codepath to accept a new parameter SnapKind :: snapshot | checkpoint otherwise the snapshotting codepath can be almost entirely reused for checkpointing. We don't yet properly handle the `snapshot_written` event and properly complete either the snapshot or the checkpoint depending on which was written - that will be covered in the child commit. --- src/ra_log.erl | 101 +++++++++++++++++++++---------------- src/ra_snapshot.erl | 34 +++++++++---- test/ra_log_2_SUITE.erl | 15 +++--- test/ra_snapshot_SUITE.erl | 44 ++++++++-------- 4 files changed, 113 insertions(+), 81 deletions(-) diff --git a/src/ra_log.erl b/src/ra_log.erl index d85a9c96..21d57250 100644 --- a/src/ra_log.erl +++ b/src/ra_log.erl @@ -63,7 +63,7 @@ ToTerm :: ra_term()}} | {segments, ets:tid(), [segment_ref()]} | {resend_write, ra_index()} | - {snapshot_written, ra_idxterm()} | + {snapshot_written, ra_idxterm(), ra_snapshot:kind()} | {down, pid(), term()}. -type event() :: {ra_log_event, event_body()}. @@ -505,7 +505,7 @@ handle_event({segments, Tid, NewSegs}, end), {State, log_update_effects(Readers, Pid, State)} end; -handle_event({snapshot_written, {SnapIdx, _} = Snap}, +handle_event({snapshot_written, {SnapIdx, _} = Snap, _SnapKind}, #?MODULE{cfg = Cfg, first_index = FstIdx, snapshot_state = SnapState0} = State0) @@ -526,7 +526,7 @@ handle_event({snapshot_written, {SnapIdx, _} = Snap}, %% be for a past index {State#?MODULE{first_index = SnapIdx + 1, snapshot_state = SnapState}, Effects}; -handle_event({snapshot_written, {Idx, Term} = Snap}, +handle_event({snapshot_written, {Idx, Term} = Snap, _SnapKind}, #?MODULE{cfg =#cfg{log_id = LogId}, snapshot_state = SnapState} = State0) -> %% if the snapshot is stale we just want to delete it @@ -621,23 +621,26 @@ snapshot_index_term(#?MODULE{snapshot_state = SS}) -> MacVersion :: ra_machine:version(), MacState :: term(), State :: state()) -> {state(), effects()}. -update_release_cursor(Idx, Cluster, MacVersion, MacState, - #?MODULE{snapshot_state = SnapState} = State) -> - case ra_snapshot:pending(SnapState) of - undefined -> - update_release_cursor0(Idx, Cluster, MacVersion, MacState, State); - _ -> - % if a snapshot is in progress don't even evaluate - {State, []} - end. +update_release_cursor(Idx, Cluster, MacVersion, MacState, State) -> + suggest_snapshot(snapshot, Idx, Cluster, MacVersion, MacState, State). -spec checkpoint(Idx :: ra_index(), Cluster :: ra_cluster(), MacVersion :: ra_machine:version(), MacState :: term(), State :: state()) -> {state(), effects()}. -checkpoint(Idx, Cluster, MacVersion, MacState, - #?MODULE{checkpoint_state = CheckpointState} = State) -> - todo. +checkpoint(Idx, Cluster, MacVersion, MacState, State) -> + suggest_snapshot(checkpoint, Idx, Cluster, MacVersion, MacState, State). + +suggest_snapshot(SnapKind, Idx, Cluster, MacVersion, MacState, + #?MODULE{snapshot_state = SnapshotState} = State) -> + case ra_snapshot:pending(SnapshotState) of + undefined -> + suggest_snapshot0(SnapKind, Idx, Cluster, MacVersion, MacState, State); + _ -> + %% Only one snapshot or checkpoint may be written at a time to + %% prevent excessive I/O usage. + {State, []} + end. -spec flush_cache(state()) -> state(). flush_cache(#?MODULE{cache = Cache} = State) -> @@ -647,28 +650,15 @@ flush_cache(#?MODULE{cache = Cache} = State) -> needs_cache_flush(#?MODULE{cache = Cache}) -> ra_log_cache:needs_flush(Cache). -update_release_cursor0(Idx, Cluster, MacVersion, MacState, - #?MODULE{cfg = #cfg{snapshot_interval = SnapInter}, - reader = Reader, - snapshot_state = SnapState} = State0) -> +suggest_snapshot0(SnapKind, Idx, Cluster, MacVersion, MacState, State0) -> ClusterServerIds = maps:map(fun (_, V) -> maps:with([voter_status], V) end, Cluster), - SnapLimit = case ra_snapshot:current(SnapState) of - undefined -> SnapInter; - {I, _} -> I + SnapInter - end, Meta = #{index => Idx, cluster => ClusterServerIds, machine_version => MacVersion}, - % The release cursor index is the last entry _not_ contributing - % to the current state. I.e. the last entry that can be discarded. - % Check here if any segments can be release. - case lists:any(fun({_, To, _}) -> To =< Idx end, - ra_log_reader:segment_refs(Reader)) of + case should_snapshot(SnapKind, Idx, State0) of true -> - % segments can be cleared up - % take a snapshot at the release_cursor % TODO: here we use the current cluster configuration in % the snapshot, % _not_ the configuration at the snapshot point. @@ -682,23 +672,41 @@ update_release_cursor0(Idx, Cluster, MacVersion, MacState, % or a reference for external storage (e.g. ETS table) case fetch_term(Idx, State0) of {undefined, _} -> - exit({term_not_found_for_index, Idx}); + {State0, []}; {Term, State} -> - write_snapshot(Meta#{term => Term}, MacState, State) - end; - false when Idx > SnapLimit -> - %% periodically take snapshots even if segments cannot be cleared - %% up - case fetch_term(Idx, State0) of - {undefined, State} -> - {State, []}; - {Term, State} -> - write_snapshot(Meta#{term => Term}, MacState, State) + write_snapshot(Meta#{term => Term}, MacState, + SnapKind, State) end; false -> {State0, []} end. +should_snapshot(snapshot, Idx, + #?MODULE{cfg = #cfg{snapshot_interval = SnapInter}, + reader = Reader, + snapshot_state = SnapState}) -> + SnapLimit = case ra_snapshot:current(SnapState) of + undefined -> SnapInter; + {I, _} -> I + SnapInter + end, + % The release cursor index is the last entry _not_ contributing + % to the current state. I.e. the last entry that can be discarded. + % We should take a snapshot if the new snapshot index would allow us + % to discard any segments or if the we've handled enough commands + % since the last snapshot. + CanFreeSegments = lists:any(fun({_, To, _}) -> To =< Idx end, + ra_log_reader:segment_refs(Reader)), + CanFreeSegments orelse Idx > SnapLimit; +should_snapshot(checkpoint, Idx, + #?MODULE{cfg = #cfg{snapshot_interval = CheckpointInter}, + %% ^ TODO: use new cfg var. + snapshot_state = SnapState}) -> + CheckpointLimit = case ra_snapshot:latest_checkpoint(SnapState) of + undefined -> CheckpointInter; + {I, _} -> I + CheckpointInter + end, + Idx > CheckpointLimit. + -spec append_sync(Entry :: log_entry(), State :: state()) -> state() | no_return(). append_sync({Idx, Term, _} = Entry, Log0) -> @@ -985,11 +993,16 @@ write_entries([{FstIdx, _, _} | Rest] = Entries, State0) -> Error end. -write_snapshot(Meta, MacRef, +write_snapshot(Meta, MacRef, SnapKind, #?MODULE{cfg = Cfg, snapshot_state = SnapState0} = State) -> - ok = incr_counter(Cfg, ?C_RA_LOG_SNAPSHOTS_WRITTEN, 1), - {SnapState, Effects} = ra_snapshot:begin_snapshot(Meta, MacRef, SnapState0), + Counter = case SnapKind of + snapshot -> ?C_RA_LOG_SNAPSHOTS_WRITTEN; + checkpoint -> ?C_RA_LOG_CHECKPOINTS_WRITTEN + end, + ok = incr_counter(Cfg, Counter, 1), + {SnapState, Effects} = ra_snapshot:begin_snapshot(Meta, MacRef, SnapKind, + SnapState0), {State#?MODULE{snapshot_state = SnapState}, Effects}. recover_range(UId, Reader, SegWriter) -> diff --git a/src/ra_snapshot.erl b/src/ra_snapshot.erl index 1248991f..c169ca92 100644 --- a/src/ra_snapshot.erl +++ b/src/ra_snapshot.erl @@ -28,7 +28,7 @@ directory/1, last_index_for/1, - begin_snapshot/3, + begin_snapshot/4, complete_snapshot/2, begin_accept/2, @@ -283,12 +283,20 @@ last_index_for(UId) -> [{_, Index}] -> Index end. --spec begin_snapshot(meta(), ReleaseCursorRef :: term(), state()) -> +-spec begin_snapshot(meta(), ReleaseCursorRef :: term(), kind(), state()) -> {state(), [effect()]}. -begin_snapshot(#{index := Idx, term := Term} = Meta, MacRef, +begin_snapshot(#{index := Idx, term := Term} = Meta, MacRef, SnapKind, #?MODULE{module = Mod, counter = Counter, - snapshot_directory = Dir} = State) -> + snapshot_directory = SnapshotDir, + checkpoint_directory = CheckpointDir} = State) -> + {CounterIdx, Dir} = + case SnapKind of + snapshot -> + {?C_RA_LOG_SNAPSHOT_BYTES_WRITTEN, SnapshotDir}; + checkpoint -> + {?C_RA_LOG_CHECKPOINT_BYTES_WRITTEN, CheckpointDir} + end, %% create directory for this snapshot SnapDir = make_snapshot_dir(Dir, Idx, Term), %% call prepare then write_snapshot @@ -302,19 +310,18 @@ begin_snapshot(#{index := Idx, term := Term} = Meta, MacRef, case Mod:write(SnapDir, Meta, Ref) of ok -> ok; {ok, BytesWritten} -> - counters_add(Counter, - ?C_RA_LOG_SNAPSHOT_BYTES_WRITTEN, + counters_add(Counter, CounterIdx, BytesWritten), ok end, Self ! {ra_log_event, - {snapshot_written, {Idx, Term}}}, + {snapshot_written, {Idx, Term}, SnapKind}}, ok end), %% record snapshot in progress %% emit an effect that monitors the current snapshot attempt - {State#?MODULE{pending = {Pid, {Idx, Term}}}, + {State#?MODULE{pending = {Pid, {Idx, Term}, SnapKind}}, [{monitor, process, snapshot_writer, Pid}]}. -spec complete_snapshot(ra_idxterm(), state()) -> @@ -405,9 +412,14 @@ handle_down(_Pid, noproc, State) -> %% finished State; handle_down(Pid, _Info, - #?MODULE{snapshot_directory = Dir, - pending = {Pid, IdxTerm}} = State) -> - %% delete the pending snapshot directory + #?MODULE{snapshot_directory = SnapshotDir, + checkpoint_directory = CheckpointDir, + pending = {Pid, IdxTerm, SnapKind}} = State) -> + %% delete the pending snapshot/checkpoint directory + Dir = case SnapKind of + snapshot -> SnapshotDir; + checkpoint -> CheckpointDir + end, ok = delete(Dir, IdxTerm), State#?MODULE{pending = undefined}. diff --git a/test/ra_log_2_SUITE.erl b/test/ra_log_2_SUITE.erl index cc174003..6ba16301 100644 --- a/test/ra_log_2_SUITE.erl +++ b/test/ra_log_2_SUITE.erl @@ -311,7 +311,8 @@ sparse_read_out_of_range_2(Config) -> {Log2, _} = ra_log:update_release_cursor(SnapIdx, #{}, 2, <<"snap@10">>, Log1), {Log3, _} = receive - {ra_log_event, {snapshot_written, {10, 2}} = Evt} -> + {ra_log_event, {snapshot_written, {10, 2}, + snapshot} = Evt} -> ra_log:handle_event(Evt, Log2) after 5000 -> flush(), @@ -397,7 +398,8 @@ written_event_after_snapshot(Config) -> {Log2, _} = ra_log:update_release_cursor(2, #{}, 1, <<"one+two">>, Log1b), {Log3, _} = receive - {ra_log_event, {snapshot_written, {2, 1}} = Evt} -> + {ra_log_event, {snapshot_written, {2, 1}, + snapshot} = Evt} -> ra_log:handle_event(Evt, Log2) after 500 -> exit(snapshot_written_timeout) @@ -412,7 +414,7 @@ written_event_after_snapshot(Config) -> <<"one+two+three+four">>, Log6b), _ = receive - {ra_log_event, {snapshot_written, {4, 1}} = E} -> + {ra_log_event, {snapshot_written, {4, 1}, snapshot} = E} -> ra_log:handle_event(E, Log7) after 500 -> exit(snapshot_written_timeout) @@ -699,7 +701,8 @@ snapshot_written_after_installation(Config) -> {Log2, _} = ra_log:update_release_cursor(5, #{}, 1, <<"one-five">>, Log1), DelayedSnapWritten = receive - {ra_log_event, {snapshot_written, {5, 1}} = Evt} -> + {ra_log_event, {snapshot_written, {5, 1}, + snapshot} = Evt} -> Evt after 1000 -> flush(), @@ -1369,10 +1372,10 @@ create_snapshot_chunk(Config, #{index := Idx} = Meta, Context) -> Sn0 = ra_snapshot:init(<<"someotheruid_adsfasdf">>, ra_log_snapshot, OthDir, CPDir, undefined), MacRef = <<"9">>, - {Sn1, _} = ra_snapshot:begin_snapshot(Meta, MacRef, Sn0), + {Sn1, _} = ra_snapshot:begin_snapshot(Meta, MacRef, snapshot, Sn0), Sn2 = receive - {ra_log_event, {snapshot_written, {Idx, 2} = IdxTerm}} -> + {ra_log_event, {snapshot_written, {Idx, 2} = IdxTerm, snapshot}} -> ra_snapshot:complete_snapshot(IdxTerm, Sn1) after 1000 -> exit(snapshot_timeout) diff --git a/test/ra_snapshot_SUITE.erl b/test/ra_snapshot_SUITE.erl index 8d66018e..ce9d9595 100644 --- a/test/ra_snapshot_SUITE.erl +++ b/test/ra_snapshot_SUITE.erl @@ -99,11 +99,11 @@ take_snapshot(Config) -> Meta = meta(55, 2, [node()]), MacRef = ?FUNCTION_NAME, {State1, [{monitor, process, snapshot_writer, Pid}]} = - ra_snapshot:begin_snapshot(Meta, MacRef, State0), + ra_snapshot:begin_snapshot(Meta, MacRef, snapshot, State0), undefined = ra_snapshot:current(State1), {Pid, {55, 2}, snapshot} = ra_snapshot:pending(State1), receive - {ra_log_event, {snapshot_written, {55, 2} = IdxTerm}} -> + {ra_log_event, {snapshot_written, {55, 2} = IdxTerm, snapshot}} -> State = ra_snapshot:complete_snapshot(IdxTerm, State1), undefined = ra_snapshot:pending(State), {55, 2} = ra_snapshot:current(State), @@ -122,7 +122,7 @@ take_snapshot_crash(Config) -> Meta = meta(55, 2, [node()]), MacRef = ?FUNCTION_NAME, {State1, [{monitor, process, snapshot_writer, Pid}]} = - ra_snapshot:begin_snapshot(Meta, MacRef, State0), + ra_snapshot:begin_snapshot(Meta, MacRef, snapshot, State0), undefined = ra_snapshot:current(State1), {Pid, {55, 2}, snapshot} = ra_snapshot:pending(State1), receive @@ -153,9 +153,9 @@ init_recover(Config) -> undefined), Meta = meta(55, 2, [node()]), {State1, [{monitor, process, snapshot_writer, _}]} = - ra_snapshot:begin_snapshot(Meta, ?FUNCTION_NAME, State0), + ra_snapshot:begin_snapshot(Meta, ?FUNCTION_NAME, snapshot, State0), receive - {ra_log_event, {snapshot_written, IdxTerm}} -> + {ra_log_event, {snapshot_written, IdxTerm, snapshot}} -> _ = ra_snapshot:complete_snapshot(IdxTerm, State1), ok after 1000 -> @@ -185,9 +185,9 @@ init_recover_voter_status(Config) -> undefined), Meta = meta(55, 2, #{node() => #{voter_status => test}}), {State1, [{monitor, process, snapshot_writer, _}]} = - ra_snapshot:begin_snapshot(Meta, ?FUNCTION_NAME, State0), + ra_snapshot:begin_snapshot(Meta, ?FUNCTION_NAME, snapshot, State0), receive - {ra_log_event, {snapshot_written, IdxTerm}} -> + {ra_log_event, {snapshot_written, IdxTerm, snapshot}} -> _ = ra_snapshot:complete_snapshot(IdxTerm, State1), ok after 1000 -> @@ -217,12 +217,13 @@ init_multi(Config) -> undefined), Meta1 = meta(55, 2, [node()]), Meta2 = meta(165, 2, [node()]), - {State1, _} = ra_snapshot:begin_snapshot(Meta1, ?FUNCTION_NAME, State0), + {State1, _} = ra_snapshot:begin_snapshot(Meta1, ?FUNCTION_NAME, snapshot, + State0), receive - {ra_log_event, {snapshot_written, IdxTerm}} -> + {ra_log_event, {snapshot_written, IdxTerm, snapshot}} -> State2 = ra_snapshot:complete_snapshot(IdxTerm, State1), {State3, _} = ra_snapshot:begin_snapshot(Meta2, ?FUNCTION_NAME, - State2), + snapshot, State2), {_, {165, 2}, snapshot} = ra_snapshot:pending(State3), {55, 2} = ra_snapshot:current(State3), 55 = ra_snapshot:last_index_for(UId), @@ -259,12 +260,13 @@ init_recover_multi_corrupt(Config) -> State0 = ra_snapshot:init(UId, ra_log_snapshot, SnapsDir, CPDir, undefined), Meta1 = meta(55, 2, [node()]), Meta2 = meta(165, 2, [node()]), - {State1, _} = ra_snapshot:begin_snapshot(Meta1, ?FUNCTION_NAME, State0), + {State1, _} = ra_snapshot:begin_snapshot(Meta1, ?FUNCTION_NAME, snapshot, + State0), receive - {ra_log_event, {snapshot_written, IdxTerm}} -> + {ra_log_event, {snapshot_written, IdxTerm, snapshot}} -> State2 = ra_snapshot:complete_snapshot(IdxTerm, State1), {State3, _} = ra_snapshot:begin_snapshot(Meta2, ?FUNCTION_NAME, - State2), + snapshot, State2), {_, {165, 2}, snapshot} = ra_snapshot:pending(State3), {55, 2} = ra_snapshot:current(State3), 55 = ra_snapshot:last_index_for(UId), @@ -310,7 +312,7 @@ init_recover_corrupt(Config) -> {State1, _} = ra_snapshot:begin_snapshot(Meta, ?FUNCTION_NAME, snapshot, State0), _ = receive - {ra_log_event, {snapshot_written, IdxTerm}} -> + {ra_log_event, {snapshot_written, IdxTerm, snapshot}} -> ra_snapshot:complete_snapshot(IdxTerm, State1) after 1000 -> error(snapshot_event_timeout) @@ -345,9 +347,9 @@ read_snapshot(Config) -> Meta = meta(55, 2, [node()]), MacRef = crypto:strong_rand_bytes(1024 * 4), {State1, _} = - ra_snapshot:begin_snapshot(Meta, MacRef, State0), + ra_snapshot:begin_snapshot(Meta, MacRef, snapshot, State0), State = receive - {ra_log_event, {snapshot_written, IdxTerm}} -> + {ra_log_event, {snapshot_written, IdxTerm, snapshot}} -> ra_snapshot:complete_snapshot(IdxTerm, State1) after 1000 -> error(snapshot_event_timeout) @@ -441,7 +443,8 @@ accept_receives_snapshot_written_with_lower_index(Config) -> MetaRemote = meta(165, 2, [node()]), MetaRemoteBin = term_to_binary(MetaRemote), %% begin a local snapshot - {State1, _} = ra_snapshot:begin_snapshot(MetaLocal, ?FUNCTION_NAME, State0), + {State1, _} = ra_snapshot:begin_snapshot(MetaLocal, ?FUNCTION_NAME, + snapshot, State0), MacRef = crypto:strong_rand_bytes(1024), MacBin = term_to_binary(MacRef), Crc = erlang:crc32([<<(size(MetaRemoteBin)):32/unsigned>>, @@ -458,7 +461,7 @@ accept_receives_snapshot_written_with_lower_index(Config) -> %% then the snapshot written event is received receive - {ra_log_event, {snapshot_written, {55, 2} = IdxTerm}} -> + {ra_log_event, {snapshot_written, {55, 2} = IdxTerm, snapshot}} -> State4 = ra_snapshot:complete_snapshot(IdxTerm, State3), undefined = ra_snapshot:pending(State4), {55, 2} = ra_snapshot:current(State4), @@ -481,7 +484,8 @@ accept_receives_snapshot_written_with_higher_index(Config) -> MetaRemote = meta(55, 2, [node()]), MetaLocal = meta(165, 2, [node()]), %% begin a local snapshot - {State1, _} = ra_snapshot:begin_snapshot(MetaLocal, ?FUNCTION_NAME, State0), + {State1, _} = ra_snapshot:begin_snapshot(MetaLocal, ?FUNCTION_NAME, + snapshot, State0), MacRef = crypto:strong_rand_bytes(1024), MacBin = term_to_binary(MacRef), %% split into 1024 max byte chunks @@ -496,7 +500,7 @@ accept_receives_snapshot_written_with_higher_index(Config) -> %% then the snapshot written event is received receive - {ra_log_event, {snapshot_written, {55, 2} = IdxTerm}} -> + {ra_log_event, {snapshot_written, {55, 2} = IdxTerm, snapshot}} -> State4 = ra_snapshot:complete_snapshot(IdxTerm, State3), undefined = ra_snapshot:pending(State4), {55, 2} = ra_snapshot:current(State4), From b7161ec7dbb8bdc891f32c64a9e7822783da4003 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Fri, 26 Jan 2024 16:04:32 -0500 Subject: [PATCH 07/17] ra_log: Handle snapshot kind in `snapshot_written` event `snapshot_written` with a `ra_snapshot:kind()` of `snapshot` retains the same behavior as before checkpoints. We add a clause to handle `checkpoint`s though which adds the checkpoint to the `#ra_snapshot{}` state and thins out any surplus checkpoints. When we write a regular `snapshot` kind, we also remove any checkpoints older than the snapshot's index. These older checkpoints have no use once there's a snapshot newer than them: recovery would only be slower and there's no point in promoting a checkpoint for a state older than the current snapshot - it would be a no-op. --- src/ra_lib.erl | 7 +++ src/ra_log.erl | 60 ++++++++++++------- src/ra_snapshot.erl | 117 +++++++++++++++++++++++++++++++++---- test/ra_log_2_SUITE.erl | 2 +- test/ra_snapshot_SUITE.erl | 18 +++--- 5 files changed, 162 insertions(+), 42 deletions(-) diff --git a/src/ra_lib.erl b/src/ra_lib.erl index 41995b73..5e9c6c82 100644 --- a/src/ra_lib.erl +++ b/src/ra_lib.erl @@ -41,6 +41,7 @@ write_file/2, lists_chunk/2, lists_detect_sort/1, + lists_shuffle/1, is_dir/1, is_file/1, ensure_dir/1, @@ -382,6 +383,12 @@ do_ascending(A, [B | Rem]) do_ascending(_A, _) -> unsorted. +%% Reorder a list randomly. +-spec lists_shuffle(list()) -> list(). +lists_shuffle(List0) -> + List1 = [{rand:uniform(), Elem} || Elem <- List0], + [Elem || {_, Elem} <- lists:keysort(1, List1)]. + is_dir(Dir) -> case prim_file:read_file_info(Dir) of {ok, #file_info{type=directory}} -> diff --git a/src/ra_log.erl b/src/ra_log.erl index 21d57250..4611eb82 100644 --- a/src/ra_log.erl +++ b/src/ra_log.erl @@ -505,37 +505,55 @@ handle_event({segments, Tid, NewSegs}, end), {State, log_update_effects(Readers, Pid, State)} end; -handle_event({snapshot_written, {SnapIdx, _} = Snap, _SnapKind}, +handle_event({snapshot_written, {SnapIdx, _} = Snap, SnapKind}, #?MODULE{cfg = Cfg, first_index = FstIdx, snapshot_state = SnapState0} = State0) %% only update snapshot if it is newer than the last snapshot when SnapIdx >= FstIdx -> - % delete any segments outside of first_index - {State, Effects0} = delete_segments(SnapIdx, State0), - SnapState = ra_snapshot:complete_snapshot(Snap, SnapState0), - put_counter(Cfg, ?C_RA_SVR_METRIC_SNAPSHOT_INDEX, SnapIdx), - %% delete old snapshot files - %% This is done as an effect - %% so that if an old snapshot is still being replicated - %% the cleanup can be delayed until it is safe - Effects = [{delete_snapshot, - ra_snapshot:directory(SnapState), - ra_snapshot:current(SnapState0)} | Effects0], - %% do not set last written index here as the snapshot may - %% be for a past index - {State#?MODULE{first_index = SnapIdx + 1, - snapshot_state = SnapState}, Effects}; -handle_event({snapshot_written, {Idx, Term} = Snap, _SnapKind}, + SnapState1 = ra_snapshot:complete_snapshot(Snap, SnapKind, SnapState0), + case SnapKind of + snapshot -> + put_counter(Cfg, ?C_RA_SVR_METRIC_SNAPSHOT_INDEX, SnapIdx), + % delete any segments outside of first_index + {State, Effects0} = delete_segments(SnapIdx, State0), + %% Delete old snapshot files. This is done as an effect + %% so that if an old snapshot is still being replicated + %% the cleanup can be delayed until it is safe. + DeleteCurrentSnap = {delete_snapshot, + ra_snapshot:directory(SnapState1, snapshot), + ra_snapshot:current(SnapState0)}, + %% Also delete any checkpoints older than this snapshot. + {SnapState, Checkpoints} = + ra_snapshot:take_older_checkpoints(SnapIdx, SnapState1), + CPEffects = [{delete_snapshot, + ra_snapshot:directory(SnapState, checkpoint), + Checkpoint} || Checkpoint <- Checkpoints], + Effects = [DeleteCurrentSnap | CPEffects] ++ Effects0, + %% do not set last written index here as the snapshot may + %% be for a past index + {State#?MODULE{first_index = SnapIdx + 1, + snapshot_state = SnapState}, Effects}; + checkpoint -> + put_counter(Cfg, ?C_RA_SVR_METRIC_CHECKPOINT_INDEX, SnapIdx), + %% If we already have the maximum allowed number of checkpoints, + %% remove some checkpoints to make space. + {SnapState, CPs} = ra_snapshot:take_extra_checkpoints(SnapState1), + Effects = [{delete_snapshot, + ra_snapshot:directory(SnapState, SnapKind), + CP} || CP <- CPs], + {State0#?MODULE{snapshot_state = SnapState}, Effects} + end; +handle_event({snapshot_written, {Idx, Term} = Snap, SnapKind}, #?MODULE{cfg =#cfg{log_id = LogId}, snapshot_state = SnapState} = State0) -> - %% if the snapshot is stale we just want to delete it + %% if the snapshot/checkpoint is stale we just want to delete it Current = ra_snapshot:current(SnapState), ?INFO("~ts: old snapshot_written received for index ~b in term ~b - current snapshot ~w, deleting old snapshot", - [LogId, Idx, Term, Current]), + current snapshot ~w, deleting old ~s", + [LogId, Idx, Term, Current, SnapKind]), Effects = [{delete_snapshot, - ra_snapshot:directory(SnapState), + ra_snapshot:directory(SnapState, SnapKind), Snap}], {State0, Effects}; handle_event({resend_write, Idx}, State) -> diff --git a/src/ra_snapshot.erl b/src/ra_snapshot.erl index c169ca92..309523c2 100644 --- a/src/ra_snapshot.erl +++ b/src/ra_snapshot.erl @@ -25,11 +25,11 @@ current/1, pending/1, accepting/1, - directory/1, + directory/2, last_index_for/1, begin_snapshot/4, - complete_snapshot/2, + complete_snapshot/3, begin_accept/2, accept_chunk/4, @@ -40,7 +40,10 @@ handle_down/3, current_snapshot_dir/1, - latest_checkpoint/1 + latest_checkpoint/1, + + take_older_checkpoints/2, + take_extra_checkpoints/1 ]). -type effect() :: {monitor, process, snapshot_writer, pid()}. @@ -82,6 +85,9 @@ -define(ETSTBL, ra_log_snapshot_state). +%% TODO: Make this constant configurable? +-define(MAX_CHECKPOINTS, 10). + -opaque state() :: #?MODULE{}. -export_type([state/0]). @@ -273,8 +279,9 @@ accepting(#?MODULE{accepting = undefined}) -> accepting(#?MODULE{accepting = #accept{idxterm = Accepting}}) -> Accepting. --spec directory(state()) -> file:filename(). -directory(#?MODULE{snapshot_directory = Dir}) -> Dir. +-spec directory(state(), kind()) -> file:filename(). +directory(#?MODULE{snapshot_directory = Dir}, snapshot) -> Dir; +directory(#?MODULE{checkpoint_directory = Dir}, checkpoint) -> Dir. -spec last_index_for(ra_uid()) -> option(ra_index()). last_index_for(UId) -> @@ -324,15 +331,17 @@ begin_snapshot(#{index := Idx, term := Term} = Meta, MacRef, SnapKind, {State#?MODULE{pending = {Pid, {Idx, Term}, SnapKind}}, [{monitor, process, snapshot_writer, Pid}]}. --spec complete_snapshot(ra_idxterm(), state()) -> +-spec complete_snapshot(ra_idxterm(), kind(), state()) -> state(). -complete_snapshot({Idx, _} = IdxTerm, - #?MODULE{uid = UId, - module = _Mod, - snapshot_directory = _Dir} = State) -> +complete_snapshot({Idx, _} = IdxTerm, snapshot, + #?MODULE{uid = UId} = State) -> true = ets:insert(?ETSTBL, {UId, Idx}), State#?MODULE{pending = undefined, - current = IdxTerm}. + current = IdxTerm}; +complete_snapshot(IdxTerm, checkpoint, + #?MODULE{checkpoints = Checkpoints0} = State) -> + State#?MODULE{pending = undefined, + checkpoints = [IdxTerm | Checkpoints0]}. -spec begin_accept(meta(), state()) -> {ok, state()}. @@ -482,6 +491,29 @@ current_snapshot_dir(#?MODULE{snapshot_directory = Dir, current_snapshot_dir(_) -> undefined. +-spec take_older_checkpoints(ra_index(), state()) -> + {state(), [checkpoint()]}. +take_older_checkpoints(Idx, #?MODULE{checkpoints = Checkpoints0} = State0) -> + {Checkpoints, Outdated} = lists:splitwith(fun ({CPIdx, _Term}) -> + CPIdx > Idx + end, Checkpoints0), + {State0#?MODULE{checkpoints = Checkpoints}, Outdated}. + +-spec take_extra_checkpoints(state()) -> + {state(), [checkpoint()]}. +take_extra_checkpoints(#?MODULE{checkpoints = Checkpoints0} = State0) -> + Len = erlang:length(Checkpoints0), + case Len - ?MAX_CHECKPOINTS of + ToDelete when ToDelete > 0 -> + %% Take `ToDelete' checkpoints from the list randomly without + %% ever taking the first or last checkpoint. + IdxsToTake = random_idxs_to_take(?MAX_CHECKPOINTS, ToDelete), + {Checkpoints, Extras} = lists_take_idxs(Checkpoints0, IdxsToTake), + {State0#?MODULE{checkpoints = Checkpoints}, Extras}; + _ -> + {State0, []} + end. + %% Utility make_snapshot_dir(Dir, Index, Term) -> @@ -493,3 +525,66 @@ counters_add(undefined, _, _) -> ok; counters_add(Counter, Ix, Incr) -> counters:add(Counter, Ix, Incr). + +random_idxs_to_take(Max, N) -> + %% Always retain the first and last elements. + AllIdxs = lists:seq(2, Max - 1), + %% Take a random subset of those indices of length N. + lists:sublist(ra_lib:lists_shuffle(AllIdxs), N). + +%% Take items from the given list by the given indices without disturbing the +%% order of the list. +-spec lists_take_idxs(List, Idxs) -> {List1, Taken} when + List :: list(Elem), + Elem :: any(), + Idxs :: list(pos_integer()), + List1 :: list(Elem), + Taken :: list(Elem). +lists_take_idxs(List, Idxs0) -> + %% Sort the indices so `lists_take_idxs/5' may run linearly on the two lists + Idxs = lists:sort(Idxs0), + %% 1-indexing like the `lists' module. + lists_take_idxs(List, Idxs, 1, [], []). + +lists_take_idxs([Elem | Elems], [Idx | Idxs], Idx, TakeAcc, ElemAcc) -> + lists_take_idxs(Elems, Idxs, Idx + 1, [Elem | TakeAcc], ElemAcc); +lists_take_idxs([Elem | Elems], Idxs, Idx, TakeAcc, ElemAcc) -> + lists_take_idxs(Elems, Idxs, Idx + 1, TakeAcc, [Elem | ElemAcc]); +lists_take_idxs(Elems, _Idxs = [], _Idx, TakeAcc, ElemAcc) -> + {lists:reverse(ElemAcc, Elems), lists:reverse(TakeAcc)}; +lists_take_idxs(_Elems = [], _Idxs, _Idx, TakeAcc, ElemAcc) -> + {lists:reverse(ElemAcc), lists:reverse(TakeAcc)}. + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + +random_idxs_to_take_test() -> + Idxs = random_idxs_to_take(10, 3), + ?assertEqual(3, length(Idxs)), + [Min, _, Max] = lists:sort(Idxs), + %% The first and last elements are excluded. + ?assert(Min > 1), + ?assert(Max < 10), + ok. + +lists_take_idxs_test() -> + ?assertEqual( + {[1, 3, 5, 7, 8], [2, 4, 6]}, + lists_take_idxs(lists:seq(1, 8), [2, 4, 6])), + + %% Ordering of `Idxs' doesn't matter. + ?assertEqual( + {[1, 3, 5, 7, 8], [2, 4, 6]}, + lists_take_idxs(lists:seq(1, 8), [4, 6, 2])), + + ?assertEqual( + {[a, c], [b]}, + lists_take_idxs([a, b, c], [2])), + + %% `List''s order is preserved even when nothing is taken. + ?assertEqual( + {[a, b, c], []}, + lists_take_idxs([a, b, c], [])), + ok. + +-endif. diff --git a/test/ra_log_2_SUITE.erl b/test/ra_log_2_SUITE.erl index 6ba16301..bd0afce1 100644 --- a/test/ra_log_2_SUITE.erl +++ b/test/ra_log_2_SUITE.erl @@ -1376,7 +1376,7 @@ create_snapshot_chunk(Config, #{index := Idx} = Meta, Context) -> Sn2 = receive {ra_log_event, {snapshot_written, {Idx, 2} = IdxTerm, snapshot}} -> - ra_snapshot:complete_snapshot(IdxTerm, Sn1) + ra_snapshot:complete_snapshot(IdxTerm, snapshot, Sn1) after 1000 -> exit(snapshot_timeout) end, diff --git a/test/ra_snapshot_SUITE.erl b/test/ra_snapshot_SUITE.erl index ce9d9595..b41ff89b 100644 --- a/test/ra_snapshot_SUITE.erl +++ b/test/ra_snapshot_SUITE.erl @@ -104,7 +104,7 @@ take_snapshot(Config) -> {Pid, {55, 2}, snapshot} = ra_snapshot:pending(State1), receive {ra_log_event, {snapshot_written, {55, 2} = IdxTerm, snapshot}} -> - State = ra_snapshot:complete_snapshot(IdxTerm, State1), + State = ra_snapshot:complete_snapshot(IdxTerm, snapshot, State1), undefined = ra_snapshot:pending(State), {55, 2} = ra_snapshot:current(State), 55 = ra_snapshot:last_index_for(UId), @@ -156,7 +156,7 @@ init_recover(Config) -> ra_snapshot:begin_snapshot(Meta, ?FUNCTION_NAME, snapshot, State0), receive {ra_log_event, {snapshot_written, IdxTerm, snapshot}} -> - _ = ra_snapshot:complete_snapshot(IdxTerm, State1), + _ = ra_snapshot:complete_snapshot(IdxTerm, snapshot, State1), ok after 1000 -> error(snapshot_event_timeout) @@ -188,7 +188,7 @@ init_recover_voter_status(Config) -> ra_snapshot:begin_snapshot(Meta, ?FUNCTION_NAME, snapshot, State0), receive {ra_log_event, {snapshot_written, IdxTerm, snapshot}} -> - _ = ra_snapshot:complete_snapshot(IdxTerm, State1), + _ = ra_snapshot:complete_snapshot(IdxTerm, snapshot, State1), ok after 1000 -> error(snapshot_event_timeout) @@ -221,7 +221,7 @@ init_multi(Config) -> State0), receive {ra_log_event, {snapshot_written, IdxTerm, snapshot}} -> - State2 = ra_snapshot:complete_snapshot(IdxTerm, State1), + State2 = ra_snapshot:complete_snapshot(IdxTerm, snapshot, State1), {State3, _} = ra_snapshot:begin_snapshot(Meta2, ?FUNCTION_NAME, snapshot, State2), {_, {165, 2}, snapshot} = ra_snapshot:pending(State3), @@ -264,7 +264,7 @@ init_recover_multi_corrupt(Config) -> State0), receive {ra_log_event, {snapshot_written, IdxTerm, snapshot}} -> - State2 = ra_snapshot:complete_snapshot(IdxTerm, State1), + State2 = ra_snapshot:complete_snapshot(IdxTerm, snapshot, State1), {State3, _} = ra_snapshot:begin_snapshot(Meta2, ?FUNCTION_NAME, snapshot, State2), {_, {165, 2}, snapshot} = ra_snapshot:pending(State3), @@ -313,7 +313,7 @@ init_recover_corrupt(Config) -> State0), _ = receive {ra_log_event, {snapshot_written, IdxTerm, snapshot}} -> - ra_snapshot:complete_snapshot(IdxTerm, State1) + ra_snapshot:complete_snapshot(IdxTerm, snapshot, State1) after 1000 -> error(snapshot_event_timeout) end, @@ -350,7 +350,7 @@ read_snapshot(Config) -> ra_snapshot:begin_snapshot(Meta, MacRef, snapshot, State0), State = receive {ra_log_event, {snapshot_written, IdxTerm, snapshot}} -> - ra_snapshot:complete_snapshot(IdxTerm, State1) + ra_snapshot:complete_snapshot(IdxTerm, snapshot, State1) after 1000 -> error(snapshot_event_timeout) end, @@ -462,7 +462,7 @@ accept_receives_snapshot_written_with_lower_index(Config) -> %% then the snapshot written event is received receive {ra_log_event, {snapshot_written, {55, 2} = IdxTerm, snapshot}} -> - State4 = ra_snapshot:complete_snapshot(IdxTerm, State3), + State4 = ra_snapshot:complete_snapshot(IdxTerm, snapshot, State3), undefined = ra_snapshot:pending(State4), {55, 2} = ra_snapshot:current(State4), 55 = ra_snapshot:last_index_for(UId), @@ -501,7 +501,7 @@ accept_receives_snapshot_written_with_higher_index(Config) -> %% then the snapshot written event is received receive {ra_log_event, {snapshot_written, {55, 2} = IdxTerm, snapshot}} -> - State4 = ra_snapshot:complete_snapshot(IdxTerm, State3), + State4 = ra_snapshot:complete_snapshot(IdxTerm, snapshot, State3), undefined = ra_snapshot:pending(State4), {55, 2} = ra_snapshot:current(State4), 55 = ra_snapshot:last_index_for(UId), From a6c742c4c5a1fd844dddc13acaa7a6a641e8ec57 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Fri, 26 Jan 2024 16:04:35 -0500 Subject: [PATCH 08/17] Add a `{release_cursor,Idx}` effect to promote checkpoints This effect allows you to turn a checkpoint into a snapshot. This is a useful operation for Ra machine's like `rabbit_fifo` (quorum queues) which read information out of the log. `rabbit_fifo` may only snapshot up to an index where all enqueue/requeue commands before that index have been dequeued because snapshotting deletes the enqueue/requeue commands from the log. So `rabbit_fifo` needs a way to express that it can snapshot up to a certain index which is not necessarily the current index when it emits the `release_cursor` effect. Checkpoints contain states at older indices which are perfect for this - they just need to be moved from `DataDir/checkpoints/` to `DataDir/snapshots/`. Using a file rename minimizes the I/O cost of promotion. --- src/ra_log.erl | 15 ++++++++++++++ src/ra_machine.erl | 1 + src/ra_server.erl | 9 ++++++++ src/ra_server_proc.erl | 6 ++++++ src/ra_snapshot.erl | 47 ++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 78 insertions(+) diff --git a/src/ra_log.erl b/src/ra_log.erl index 4611eb82..12c8eb75 100644 --- a/src/ra_log.erl +++ b/src/ra_log.erl @@ -32,6 +32,7 @@ snapshot_index_term/1, update_release_cursor/5, checkpoint/5, + promote_checkpoint/2, needs_cache_flush/1, can_write/1, @@ -660,6 +661,20 @@ suggest_snapshot(SnapKind, Idx, Cluster, MacVersion, MacState, {State, []} end. +promote_checkpoint(Idx, #?MODULE{cfg = Cfg, + snapshot_state = SnapState0} = State) -> + case ra_snapshot:pending(SnapState0) of + {_WriterPid, _IdxTerm, snapshot} -> + %% If we're currently writing a snapshot, skip promoting a + %% checkpoint. + {State, []}; + _ -> + ok = incr_counter(Cfg, ?C_RA_LOG_SNAPSHOTS_WRITTEN, 1), + {SnapState, Effects} = ra_snapshot:promote_checkpoint(Idx, + SnapState0), + {State#?MODULE{snapshot_state = SnapState}, Effects} + end. + -spec flush_cache(state()) -> state(). flush_cache(#?MODULE{cache = Cache} = State) -> State#?MODULE{cache = ra_log_cache:flush(Cache)}. diff --git a/src/ra_machine.erl b/src/ra_machine.erl index 255a2928..99c372f6 100644 --- a/src/ra_machine.erl +++ b/src/ra_machine.erl @@ -135,6 +135,7 @@ {log, [ra_index()], fun(([user_command()]) -> effects())} | {log, [ra_index()], fun(([user_command()]) -> effects()), {local, node()}} | {release_cursor, ra_index(), state()} | + {release_cursor, ra_index()} | {checkpoint, ra_index(), state()} | {aux, term()} | garbage_collection. diff --git a/src/ra_server.erl b/src/ra_server.erl index 0bffb3c3..e1815116 100644 --- a/src/ra_server.erl +++ b/src/ra_server.erl @@ -46,6 +46,7 @@ % TODO: hide behind a handle_leader make_rpcs/1, update_release_cursor/3, + promote_checkpoint/2, checkpoint/3, persist_last_applied/1, update_peer/3, @@ -1644,6 +1645,8 @@ evaluate_commit_index_follower(State, Effects) -> filter_follower_effects(Effects) -> lists:foldr(fun ({release_cursor, _, _} = C, Acc) -> [C | Acc]; + ({release_cursor, _} = C, Acc) -> + [C | Acc]; ({checkpoint, _, _} = C, Acc) -> [C | Acc]; ({record_leader_msg, _} = C, Acc) -> @@ -1858,6 +1861,12 @@ checkpoint(Index, MacState, MacVersion, MacState, Log0), {State#{log => Log}, Effects}. +-spec promote_checkpoint(ra_index(), ra_server_state()) -> + {ra_server_state(), effects()}. +promote_checkpoint(Index, #{log := Log0} = State) -> + {Log, Effects} = ra_log:promote_checkpoint(Index, Log0), + {State#{log => Log}, Effects}. + % Persist last_applied - as there is an inherent race we cannot % always guarantee that side effects won't be re-issued when a % follower that has seen an entry but not the commit_index diff --git a/src/ra_server_proc.erl b/src/ra_server_proc.erl index 213687db..7acdbecf 100644 --- a/src/ra_server_proc.erl +++ b/src/ra_server_proc.erl @@ -1329,6 +1329,12 @@ handle_effect(RaftState, {release_cursor, Index, MacState}, EvtType, ServerState0), State1 = State0#state{server_state = ServerState}, handle_effects(RaftState, Effects, EvtType, State1, Actions0); +handle_effect(RaftState, {release_cursor, Index}, EvtType, + #state{server_state = ServerState0} = State0, Actions0) -> + incr_counter(State0#state.conf, ?C_RA_SRV_RELEASE_CURSORS, 1), + {ServerState, Effects} = ra_server:promote_checkpoint(Index, ServerState0), + State1 = State0#state{server_state = ServerState}, + handle_effects(RaftState, Effects, EvtType, State1, Actions0); handle_effect(RaftState, {checkpoint, Index, MacState}, EvtType, #state{server_state = ServerState0} = State0, Actions0) -> incr_counter(State0#state.conf, ?C_RA_SRV_CHECKPOINTS, 1), diff --git a/src/ra_snapshot.erl b/src/ra_snapshot.erl index 309523c2..c653eaee 100644 --- a/src/ra_snapshot.erl +++ b/src/ra_snapshot.erl @@ -29,6 +29,7 @@ last_index_for/1, begin_snapshot/4, + promote_checkpoint/2, complete_snapshot/3, begin_accept/2, @@ -331,6 +332,52 @@ begin_snapshot(#{index := Idx, term := Term} = Meta, MacRef, SnapKind, {State#?MODULE{pending = {Pid, {Idx, Term}, SnapKind}}, [{monitor, process, snapshot_writer, Pid}]}. +-spec promote_checkpoint(Idx :: ra_index(), State0 :: state()) -> + {State :: state(), Effects :: [effect()]}. +promote_checkpoint(PromotionIdx, + #?MODULE{snapshot_directory = SnapDir, + checkpoint_directory = CheckpointDir, + checkpoints = Checkpoints0} = State0) -> + %% Find the checkpoint with the highest index smaller than or equal to the + %% given `Idx' and rename the checkpoint directory to the snapshot + %% directory. + case find_promotable_checkpoint(PromotionIdx, Checkpoints0, []) of + {Checkpoints, {Idx, Term}} -> + Checkpoint = make_snapshot_dir(CheckpointDir, Idx, Term), + Snapshot = make_snapshot_dir(SnapDir, Idx, Term), + Self = self(), + Pid = spawn(fun() -> + ok = file:rename(Checkpoint, Snapshot), + Self ! {ra_log_event, + {snapshot_written, + {Idx, Term}, snapshot}} + end), + State = State0#?MODULE{pending = {Pid, {Idx, Term}, snapshot}, + checkpoints = Checkpoints}, + {State, [{monitor, process, snapshot_writer, Pid}]}; + undefined -> + {State0, []} + end. + +%% Find the first checkpoint smaller than or equal to the promotion index and +%% remove it from the checkpoint list. +-spec find_promotable_checkpoint(PromotionIdx, Checkpoints, Acc) -> Result + when + PromotionIdx :: ra_index(), + Checkpoints :: [ra_idxterm()], + Acc :: [ra_idxterm()], + Result :: option({[ra_idxterm()], ra_idxterm()}). +find_promotable_checkpoint(Idx, [{CPIdx, _} = CP | Rest], Acc) + when CPIdx =< Idx -> + %% Checkpoints are sorted by index descending so the first checkpoint + %% with an index smaller than or equal to the promotion index is the proper + %% checkpoint to promote. + {lists:reverse(Rest, Acc), CP}; +find_promotable_checkpoint(Idx, [CP | Rest], Acc) -> + find_promotable_checkpoint(Idx, Rest, [CP | Acc]); +find_promotable_checkpoint(_Idx, [], _Acc) -> + undefined. + -spec complete_snapshot(ra_idxterm(), kind(), state()) -> state(). complete_snapshot({Idx, _} = IdxTerm, snapshot, From 8ec20fa0386e7c06ffebaedc16f8971181b127eb Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Fri, 26 Jan 2024 16:04:38 -0500 Subject: [PATCH 09/17] Recover from a checkpoint if available This should greatly improve recovery time in degenerate cases for machines like `rabbit_fifo`. `rabbit_fifo` only snapshots up to indices where all prior enqueued messages have been consumed. So recovery time is proportional to the number of outstanding enqueued messages unless we can recover from a checkpoint. With this change and switching to checkpoints in `rabbit_fifo`, recovery should be roughly constant-time. The only part of the code necessary to change for this is `ra_snapshot:recover/1`. That function would previously recover from the snapshot if it existed but now it can check to see if there is a more recent checkpoint. The rest of the machinery in `ra_log` transparently takes care of recovering from the checkpoint's index. --- src/ra_snapshot.erl | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/src/ra_snapshot.erl b/src/ra_snapshot.erl index c653eaee..ec5a4179 100644 --- a/src/ra_snapshot.erl +++ b/src/ra_snapshot.erl @@ -508,17 +508,29 @@ read_chunk(ReadState, ChunkSizeBytes, #?MODULE{module = Mod, Location = make_snapshot_dir(Dir, Idx, Term), Mod:read_chunk(ReadState, ChunkSizeBytes, Location). +%% Recovers from the latest checkpoint or snapshot, if available. -spec recover(state()) -> {ok, Meta :: meta(), State :: term()} | {error, no_current_snapshot} | {error, term()}. -recover(#?MODULE{current = undefined}) -> +recover(#?MODULE{current = undefined, checkpoints = []}) -> {error, no_current_snapshot}; recover(#?MODULE{module = Mod, - snapshot_directory = Dir, - current = {Idx, Term}}) -> - SnapDir = make_snapshot_dir(Dir, Idx, Term), - Mod:recover(SnapDir). + current = Snapshot, + snapshot_directory = SnapDir, + checkpoints = Checkpoints, + checkpoint_directory = CheckpointDir}) -> + %% If there are checkpoints and a snapshot, recover from whichever has the + %% highest index. Otherwise recover from whichever exists. + Dir = case {Snapshot, Checkpoints} of + {{SnapIdx, _}, [{CPIdx, CPTerm} | _]} when CPIdx > SnapIdx -> + make_snapshot_dir(CheckpointDir, CPIdx, CPTerm); + {{Idx, Term}, _} -> + make_snapshot_dir(SnapDir, Idx, Term); + {undefined, [{Idx, Term} | _]} -> + make_snapshot_dir(CheckpointDir, Idx, Term) + end, + Mod:recover(Dir). -spec read_meta(Module :: module(), Location :: file:filename()) -> {ok, meta()} | From f8f51e691bde9d081635a812a0de5541ef62ecd9 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Fri, 26 Jan 2024 16:04:40 -0500 Subject: [PATCH 10/17] Add ra_log config for the checkpoint interval Initially I have this set at 4x the snapshot interval since you might spam the `checkpoint` effect more than you would a `release_cursor` effect. --- src/ra_log.erl | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/ra_log.erl b/src/ra_log.erl index 12c8eb75..7b5ec77b 100644 --- a/src/ra_log.erl +++ b/src/ra_log.erl @@ -54,6 +54,7 @@ -define(DEFAULT_RESEND_WINDOW_SEC, 20). -define(SNAPSHOT_INTERVAL, 4096). +-define(MIN_CHECKPOINT_INTERVAL, 16384). -define(LOG_APPEND_TIMEOUT, 5000). -type ra_meta_key() :: atom(). @@ -84,6 +85,7 @@ log_id :: unicode:chardata(), directory :: file:filename(), snapshot_interval = ?SNAPSHOT_INTERVAL :: non_neg_integer(), + min_checkpoint_interval = ?MIN_CHECKPOINT_INTERVAL :: non_neg_integer(), snapshot_module :: module(), resend_window_seconds = ?DEFAULT_RESEND_WINDOW_SEC :: integer(), wal :: atom(), @@ -114,6 +116,7 @@ system_config => ra_system:config(), log_id => unicode:chardata(), snapshot_interval => non_neg_integer(), + min_checkpoint_interval => non_neg_integer(), resend_window => integer(), max_open_segments => non_neg_integer(), snapshot_module => module(), @@ -152,6 +155,7 @@ init(#{uid := UId, LogId = maps:get(log_id, Conf, UId), ResendWindow = maps:get(resend_window, Conf, ?DEFAULT_RESEND_WINDOW_SEC), SnapInterval = maps:get(snapshot_interval, Conf, ?SNAPSHOT_INTERVAL), + CPInterval = maps:get(checkpoint_interval, Conf, ?CHECKPOINT_INTERVAL), SnapshotsDir = filename:join(Dir, "snapshots"), CheckpointsDir = filename:join(Dir, "checkpoints"), Counter = maps:get(counter, Conf, undefined), @@ -195,6 +199,7 @@ init(#{uid := UId, uid = UId, log_id = LogId, snapshot_interval = SnapInterval, + min_checkpoint_interval = CPInterval, wal = Wal, segment_writer = SegWriter, resend_window_seconds = ResendWindow, @@ -731,8 +736,7 @@ should_snapshot(snapshot, Idx, ra_log_reader:segment_refs(Reader)), CanFreeSegments orelse Idx > SnapLimit; should_snapshot(checkpoint, Idx, - #?MODULE{cfg = #cfg{snapshot_interval = CheckpointInter}, - %% ^ TODO: use new cfg var. + #?MODULE{cfg = #cfg{min_checkpoint_interval = CheckpointInter}, snapshot_state = SnapState}) -> CheckpointLimit = case ra_snapshot:latest_checkpoint(SnapState) of undefined -> CheckpointInter; From 2ecaf2be392625c04038ff708f091507b638e165 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Fri, 26 Jan 2024 16:04:41 -0500 Subject: [PATCH 11/17] Document checkpointing --- docs/internals/INTERNALS.md | 21 +++++++++++++++++++++ docs/internals/STATE_MACHINE_TUTORIAL.md | 8 ++++++++ src/ra_machine.erl | 9 +++++++-- 3 files changed, 36 insertions(+), 2 deletions(-) diff --git a/docs/internals/INTERNALS.md b/docs/internals/INTERNALS.md index b533a6ff..1ca18607 100644 --- a/docs/internals/INTERNALS.md +++ b/docs/internals/INTERNALS.md @@ -188,6 +188,27 @@ It is not guaranteed that a snapshot will be taken. A decision to take a snapshot or to delay it is taken using a number of internal Ra state factors. The goal is to minimise disk I/O activity when possible. +### Checkpointing + +Checkpoints are nearly the same concept as snapshots. Snapshotting truncates +the log up to the snapshot's index, which might be undesirable for machines +which read from the log with the `{log, Indexes, Fun}` effect mentioned above. + +The `{checkpoint, RaftIndex, MachineState}` effect can be used as a hint to +trigger a checkpoint. Like snapshotting, this effect is evaluated on all nodes +and when a checkpoint is taken, the machine state is saved to disk and can be +used for recovery when the machine restarts. A checkpoint being written does +not trigger any log truncation though. + +The `{release_cursor, RaftIndex}` effect can then be used to promote any +existing checkpoint older than or equal to `RaftIndex` into a proper snapshot, +and any log entries older than the checkpoint's index are then truncated. + +These two effects are intended for machines that use the `{log, Indexes, Fun}` +effect and can substantially improve machine recovery time compared to +snapshotting alone, especially when the machine needs to keep old log entries +around for a long time. + ## State Machine Versioning It is eventually necessary to make changes to the state machine diff --git a/docs/internals/STATE_MACHINE_TUTORIAL.md b/docs/internals/STATE_MACHINE_TUTORIAL.md index b1a25266..08bed484 100644 --- a/docs/internals/STATE_MACHINE_TUTORIAL.md +++ b/docs/internals/STATE_MACHINE_TUTORIAL.md @@ -218,3 +218,11 @@ or similar. To (potentially) trigger a snapshot return the `{release_cursor, RaftIndex, MachineState}` effect. This is why the raft index is included in the `apply/3` function. Ra will only create a snapshot if doing so will result in log segments being deleted. + +For machines that must keep log segments on disk for some time, the +`{checkpoint, RaftIndex, MachineState}` effect can be used. This creates a +snapshot-like view of the machine state on disk but doesn't trigger log +truncation. Checkpoints can later be promoted to snapshots and trigger log +truncation by emitting a `{release_cursor, RaftIndex}` effect. The most +recent checkpoint with an index smaller than or equal to `RaftIndex` will be +promoted. diff --git a/src/ra_machine.erl b/src/ra_machine.erl index 99c372f6..4734019a 100644 --- a/src/ra_machine.erl +++ b/src/ra_machine.erl @@ -146,8 +146,13 @@ %% forcing a GC run. %% %% Although both leaders and followers will process the same commands, effects -%% are typically only applied on the leader. The only exception to this is -%% the `release_cursor' and `garbage_collect' effects. The former is realised on all +%% are typically only applied on the leader. The only exceptions to this are: +%%
    +%%
  • `release_cursor'
  • +%%
  • `checkpoint'
  • +%%
  • `garbage_collect'
  • +%%
+%% The former two are realised on all %% nodes as it is a part of the Ra implementation log truncation mechanism. %% The `garbage_collect' effects that is used to explicitly triggering a GC run %% in the Ra servers' process. From 523260dfedeeb5cfc42b967d0032a217d29eec0b Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Fri, 26 Jan 2024 16:04:43 -0500 Subject: [PATCH 12/17] Add a test suite for checkpointing --- test/ra_checkpoint_SUITE.erl | 355 +++++++++++++++++++++++++++++++++++ 1 file changed, 355 insertions(+) create mode 100644 test/ra_checkpoint_SUITE.erl diff --git a/test/ra_checkpoint_SUITE.erl b/test/ra_checkpoint_SUITE.erl new file mode 100644 index 00000000..272ca7e5 --- /dev/null +++ b/test/ra_checkpoint_SUITE.erl @@ -0,0 +1,355 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2024 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. +%% +-module(ra_checkpoint_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +%%%=================================================================== +%%% Common Test callbacks +%%%=================================================================== + +all() -> + [ + {group, tests} + ]. + + +all_tests() -> + [ + init_empty, + take_checkpoint, + take_checkpoint_crash, + recover_from_checkpoint_only, + recover_from_checkpoint_and_snapshot, + newer_snapshot_deletes_older_checkpoints, + init_recover_corrupt, + init_recover_multi_corrupt + ]. + +groups() -> + [ + {tests, [], all_tests()} + ]. + +init_per_suite(Config) -> + Config. + +end_per_suite(_Config) -> + ok. + +init_per_group(_Group, Config) -> + Config. + +end_per_group(_Group, _Config) -> + ok. + +init_per_testcase(TestCase, Config) -> + ok = ra_snapshot:init_ets(), + SnapDir = filename:join([?config(priv_dir, Config), + TestCase, "snapshots"]), + CheckpointDir = filename:join([?config(priv_dir, Config), + TestCase, "checkpoints"]), + ok = ra_lib:make_dir(SnapDir), + ok = ra_lib:make_dir(CheckpointDir), + [{uid, ra_lib:to_binary(TestCase)}, + {snap_dir, SnapDir}, + {checkpoint_dir, CheckpointDir} | Config]. + +end_per_testcase(_TestCase, _Config) -> + ok. + +%%%=================================================================== +%%% Test cases +%%%=================================================================== + +init_empty(Config) -> + State = init_state(Config), + undefined = ra_snapshot:latest_checkpoint(State), + + ok. + +take_checkpoint(Config) -> + State0 = init_state(Config), + + Meta = meta(55, 2, [node()]), + MacRef = ?FUNCTION_NAME, + {State1, [{monitor, process, snapshot_writer, Pid}]} = + ra_snapshot:begin_snapshot(Meta, MacRef, checkpoint, State0), + undefined = ra_snapshot:latest_checkpoint(State1), + {Pid, {55, 2}, checkpoint} = ra_snapshot:pending(State1), + receive + {ra_log_event, {snapshot_written, {55, 2} = IdxTerm, checkpoint}} -> + State = ra_snapshot:complete_snapshot(IdxTerm, checkpoint, State1), + undefined = ra_snapshot:pending(State), + {55, 2} = ra_snapshot:latest_checkpoint(State), + ok + after 1000 -> + error(snapshot_event_timeout) + end, + + ok. + +take_checkpoint_crash(Config) -> + State0 = init_state(Config), + Meta = meta(55, 2, [node()]), + MacRef = ?FUNCTION_NAME, + {State1, [{monitor, process, snapshot_writer, Pid}]} = + ra_snapshot:begin_snapshot(Meta, MacRef, checkpoint, State0), + undefined = ra_snapshot:latest_checkpoint(State1), + {Pid, {55, 2}, checkpoint} = ra_snapshot:pending(State1), + receive + {ra_log_event, _} -> + %% Just pretend the snapshot event didn't happen + %% and the process instead crashed. + ok + after 10 -> ok + end, + + State = ra_snapshot:handle_down(Pid, it_crashed_dawg, State1), + %% If the checkpoint process crashed we just have to consider the + %% checkpoint as faulty and clear it up. + undefined = ra_snapshot:pending(State), + undefined = ra_snapshot:latest_checkpoint(State), + + %% The written checkpoint should be removed. + ?assertEqual([], list_checkpoint_dirs(Config)), + + ok. + +recover_from_checkpoint_only(Config) -> + State0 = init_state(Config), + {error, no_current_snapshot} = ra_snapshot:recover(State0), + + Meta = meta(55, 2, [node()]), + {State1, [{monitor, process, snapshot_writer, _}]} = + ra_snapshot:begin_snapshot(Meta, ?FUNCTION_NAME, checkpoint, State0), + receive + {ra_log_event, {snapshot_written, IdxTerm, checkpoint}} -> + _ = ra_snapshot:complete_snapshot(IdxTerm, checkpoint, State1), + ok + after 1000 -> + error(snapshot_event_timeout) + end, + + %% Open a new snapshot state to simulate a restart. + Recover = init_state(Config), + undefined = ra_snapshot:pending(Recover), + {55, 2} = ra_snapshot:latest_checkpoint(Recover), + undefined = ra_snapshot:current(Recover), + + {ok, Meta, ?FUNCTION_NAME} = ra_snapshot:recover(Recover), + + ok. + +recover_from_checkpoint_and_snapshot(Config) -> + State0 = init_state(Config), + {error, no_current_snapshot} = ra_snapshot:recover(State0), + + %% Snapshot. + SnapMeta = meta(55, 2, [node()]), + {State1, [{monitor, process, snapshot_writer, _}]} = + ra_snapshot:begin_snapshot(SnapMeta, ?FUNCTION_NAME, snapshot, State0), + State2 = receive + {ra_log_event, {snapshot_written, IdxTerm1, snapshot}} -> + ra_snapshot:complete_snapshot(IdxTerm1, snapshot, State1) + after 1000 -> + error(snapshot_event_timeout) + end, + + %% Checkpoint at a later index. + CPMeta = meta(105, 3, [node()]), + {State3, [{monitor, process, snapshot_writer, _}]} = + ra_snapshot:begin_snapshot(CPMeta, ?FUNCTION_NAME, checkpoint, State2), + receive + {ra_log_event, {snapshot_written, IdxTerm2, checkpoint}} -> + _ = ra_snapshot:complete_snapshot(IdxTerm2, checkpoint, State3), + ok + after 1000 -> + error(snapshot_event_timeout) + end, + + %% Open a new snapshot state to simulate a restart. + Recover = init_state(Config), + undefined = ra_snapshot:pending(Recover), + %% Both the checkpoint and the snapshot exist. + {105, 3} = ra_snapshot:latest_checkpoint(Recover), + {55, 2} = ra_snapshot:current(Recover), + %% The checkpoint is used for recovery since it is newer. + {ok, CPMeta, ?FUNCTION_NAME} = ra_snapshot:recover(Recover), + + ok. + +newer_snapshot_deletes_older_checkpoints(Config) -> + State0 = init_state(Config), + {error, no_current_snapshot} = ra_snapshot:recover(State0), + + %% Checkpoint at 25. + CP1Meta = meta(25, 2, [node()]), + {State1, [{monitor, process, snapshot_writer, _}]} = + ra_snapshot:begin_snapshot(CP1Meta, ?FUNCTION_NAME, checkpoint, State0), + State2 = receive + {ra_log_event, {snapshot_written, IdxTerm1, checkpoint}} -> + ra_snapshot:complete_snapshot(IdxTerm1, checkpoint, State1) + after 1000 -> + error(snapshot_event_timeout) + end, + + %% Checkpoint at 35. + CP2Meta = meta(35, 3, [node()]), + {State3, [{monitor, process, snapshot_writer, _}]} = + ra_snapshot:begin_snapshot(CP2Meta, ?FUNCTION_NAME, checkpoint, State2), + State4 = receive + {ra_log_event, {snapshot_written, IdxTerm2, checkpoint}} -> + ra_snapshot:complete_snapshot(IdxTerm2, checkpoint, State3) + after 1000 -> + error(snapshot_event_timeout) + end, + + %% Checkpoint at 55. + CP3Meta = meta(55, 5, [node()]), + {State5, [{monitor, process, snapshot_writer, _}]} = + ra_snapshot:begin_snapshot(CP3Meta, ?FUNCTION_NAME, checkpoint, State4), + State6 = receive + {ra_log_event, {snapshot_written, IdxTerm3, checkpoint}} -> + ra_snapshot:complete_snapshot(IdxTerm3, checkpoint, State5) + after 1000 -> + error(snapshot_event_timeout) + end, + + %% Snapshot at 45. + SnapMeta = meta(45, 4, [node()]), + {State7, [{monitor, process, snapshot_writer, _}]} = + ra_snapshot:begin_snapshot(SnapMeta, ?FUNCTION_NAME, snapshot, State6), + State8 = receive + {ra_log_event, {snapshot_written, IdxTerm4, snapshot}} -> + ra_snapshot:complete_snapshot(IdxTerm4, snapshot, State7) + after 1000 -> + error(snapshot_event_timeout) + end, + + %% The first and second checkpoint are older than the snapshot. + {_State, [{35, 3}, {25, 2}]} = + ra_snapshot:take_older_checkpoints(45, State8), + + %% Open a new snapshot state to simulate a restart. + Recover = init_state(Config), + undefined = ra_snapshot:pending(Recover), + %% Both the latest checkpoint and the snapshot exist. + {55, 5} = ra_snapshot:latest_checkpoint(Recover), + {45, 4} = ra_snapshot:current(Recover), + %% The latest checkpoint has the highest index so it is used for recovery. + {ok, CP3Meta, ?FUNCTION_NAME} = ra_snapshot:recover(Recover), + + %% Initializing the state removes any checkpoints older than the snapshot, + %% so there should be one snapshot and one checkpoint only. + ?assertMatch([_], list_snap_dirs(Config)), + ?assertMatch([_], list_checkpoint_dirs(Config)), + + ok. + +init_recover_corrupt(Config) -> + State0 = init_state(Config), + + %% Take a checkpoint. + Meta = meta(55, 2, [node()]), + MacRef = ?FUNCTION_NAME, + {State1, _} = ra_snapshot:begin_snapshot(Meta, MacRef, checkpoint, State0), + receive + {ra_log_event, {snapshot_written, {55, 2} = IdxTerm, checkpoint}} -> + _ = ra_snapshot:complete_snapshot(IdxTerm, checkpoint, State1), + ok + after 1000 -> + error(snapshot_event_timeout) + end, + + %% Delete the file but leave the directory intact. + CorruptDir = filename:join(?config(checkpoint_dir, Config), + ra_lib:zpad_hex(2) ++ "_" ++ ra_lib:zpad_hex(55)), + ok = file:delete(filename:join(CorruptDir, "snapshot.dat")), + + Recover = init_state(Config), + %% The checkpoint isn't recovered and the directory is cleaned up. + undefined = ra_snapshot:pending(Recover), + undefined = ra_snapshot:current(Recover), + undefined = ra_snapshot:latest_checkpoint(Recover), + {error, no_current_snapshot} = ra_snapshot:recover(Recover), + false = filelib:is_dir(CorruptDir), + + ok. + +init_recover_multi_corrupt(Config) -> + State0 = init_state(Config), + {error, no_current_snapshot} = ra_snapshot:recover(State0), + + %% Checkpoint at 55. + CP1Meta = meta(55, 2, [node()]), + {State1, _} = + ra_snapshot:begin_snapshot(CP1Meta, ?FUNCTION_NAME, checkpoint, State0), + State2 = receive + {ra_log_event, {snapshot_written, IdxTerm1, checkpoint}} -> + ra_snapshot:complete_snapshot(IdxTerm1, checkpoint, State1) + after 1000 -> + error(snapshot_event_timeout) + end, + + %% Checkpoint at 165. + CP2Meta = meta(165, 2, [node()]), + {State3, _} = + ra_snapshot:begin_snapshot(CP2Meta, ?FUNCTION_NAME, checkpoint, State2), + State4 = receive + {ra_log_event, {snapshot_written, IdxTerm2, checkpoint}} -> + ra_snapshot:complete_snapshot(IdxTerm2, checkpoint, State3) + after 1000 -> + error(snapshot_event_timeout) + end, + {165, 2} = ra_snapshot:latest_checkpoint(State4), + + %% Corrupt the latest checkpoint. + Corrupt = filename:join(?config(checkpoint_dir, Config), + ra_lib:zpad_hex(2) ++ "_" ++ ra_lib:zpad_hex(165)), + ok = file:delete(filename:join(Corrupt, "snapshot.dat")), + + %% Open a new snapshot state to simulate a restart. + Recover = init_state(Config), + undefined = ra_snapshot:pending(Recover), + %% The latest non-corrupt checkpoint is now the latest checkpoint. + {55, 2} = ra_snapshot:latest_checkpoint(Recover), + %% The corrupt checkpoint is cleaned up. + false = filelib:is_dir(Corrupt), + + {ok, CP1Meta, ?FUNCTION_NAME} = ra_snapshot:recover(Recover), + + ok. + +%%%=================================================================== +%%% Helper functions +%%%=================================================================== + +init_state(Config) -> + ra_snapshot:init(?config(uid, Config), + ra_log_snapshot, + ?config(snap_dir, Config), + ?config(checkpoint_dir, Config), + undefined). + +meta(Idx, Term, Cluster) -> + #{index => Idx, + term => Term, + cluster => Cluster, + machine_version => 1}. + +list_checkpoint_dirs(Config) -> + CPDir = ?config(checkpoint_dir, Config), + filelib:wildcard(filename:join(CPDir, "*")). + +list_snap_dirs(Config) -> + SnapDir = ?config(snap_dir, Config), + filelib:wildcard(filename:join(SnapDir, "*")). From 12653ca4afbe63d51a23e34767cf596df802fc61 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Mon, 5 Feb 2024 14:43:24 -0500 Subject: [PATCH 13/17] Only fsync checkpoints before promotion Snapshots must be fsync'd so that we can safely truncate the log. There's no need to fsync every checkpoint though: we only care about a checkpoint surviving its write to disk when we go to promote it. So we can move the call to `file:sync/1` into the promotion task. By promotion time it's possible that the file will already be synced. --- src/ra_lib.erl | 46 ++++++++++++++++++++++++++++------ src/ra_log_snapshot.erl | 15 ++++++++--- src/ra_snapshot.erl | 24 +++++++++++++++--- test/ra_log_snapshot_SUITE.erl | 10 ++++---- 4 files changed, 75 insertions(+), 20 deletions(-) diff --git a/src/ra_lib.erl b/src/ra_lib.erl index 5e9c6c82..9a7e72f0 100644 --- a/src/ra_lib.erl +++ b/src/ra_lib.erl @@ -39,6 +39,8 @@ retry/2, retry/3, write_file/2, + write_file/3, + sync_file/1, lists_chunk/2, lists_detect_sort/1, lists_shuffle/1, @@ -50,6 +52,10 @@ maps_merge_with/3 ]). +-type file_err() :: file:posix() | badarg | terminated | system_limit. + +-export_type([file_err/0]). + -include_lib("kernel/include/file.hrl"). ceiling(X) when X < 0 -> @@ -314,18 +320,23 @@ retry(Func, Attempt, Sleep) -> retry(Func, Attempt - 1) end. - +-spec write_file(file:name_all(), iodata()) -> + ok | file_err(). write_file(Name, IOData) -> + write_file(Name, IOData, true). + +-spec write_file(file:name_all(), iodata(), Sync :: boolean()) -> + ok | file_err(). +write_file(Name, IOData, Sync) -> case file:open(Name, [binary, write, raw]) of {ok, Fd} -> case file:write(Fd, IOData) of ok -> - case file:sync(Fd) of - ok -> - file:close(Fd); - Err -> - _ = file:close(Fd), - Err + case Sync of + true -> + sync_and_close_fd(Fd); + false -> + ok end; Err -> _ = file:close(Fd), @@ -335,6 +346,27 @@ write_file(Name, IOData) -> Err end. +-spec sync_file(file:name_all()) -> + ok | file_err(). +sync_file(Name) -> + case file:open(Name, [binary, write, raw]) of + {ok, Fd} -> + sync_and_close_fd(Fd); + Err -> + Err + end. + +-spec sync_and_close_fd(file:fd()) -> + ok | file_err(). +sync_and_close_fd(Fd) -> + case file:sync(Fd) of + ok -> + file:close(Fd); + Err -> + _ = file:close(Fd), + Err + end. + lists_chunk(0, List) -> error(invalid_size, [0, List]); lists_chunk(Size, List) -> diff --git a/src/ra_log_snapshot.erl b/src/ra_log_snapshot.erl index e9cd5f95..4e2b147c 100644 --- a/src/ra_log_snapshot.erl +++ b/src/ra_log_snapshot.erl @@ -11,7 +11,8 @@ -export([ prepare/2, - write/3, + write/4, + sync/1, begin_accept/2, accept_chunk/2, complete_accept/2, @@ -42,9 +43,9 @@ prepare(_Index, State) -> State. %% Snapshot Data (binary) %% @end --spec write(file:filename(), meta(), term()) -> +-spec write(file:filename(), meta(), term(), Sync :: boolean()) -> ok | {error, file_err()}. -write(Dir, Meta, MacState) -> +write(Dir, Meta, MacState, Sync) -> %% no compression on meta data to make sure reading it is as fast %% as possible MetaBin = term_to_binary(Meta), @@ -55,7 +56,13 @@ write(Dir, Meta, MacState) -> ra_lib:write_file(File, [<>, - Data]). + Data], Sync). + +-spec sync(file:filename()) -> + ok | {error, file_err()}. +sync(Dir) -> + File = filename(Dir), + ra_lib:sync_file(File). begin_accept(SnapDir, Meta) -> File = filename(SnapDir), diff --git a/src/ra_snapshot.erl b/src/ra_snapshot.erl index ec5a4179..0cf80c2f 100644 --- a/src/ra_snapshot.erl +++ b/src/ra_snapshot.erl @@ -8,7 +8,7 @@ -include("ra.hrl"). --type file_err() :: file:posix() | badarg | terminated | system_limit. +-type file_err() :: ra_lib:file_err(). %% alias -type meta() :: snapshot_meta(). @@ -104,13 +104,20 @@ %% Saves snapshot from external state to disk. %% Runs in a separate process. %% External storage should be available to read +%% `Sync' suggests whether the file should be synchronized with `fsync(1)'. -callback write(Location :: file:filename(), Meta :: meta(), - Ref :: term()) -> + Ref :: term(), + Sync :: boolean()) -> ok | {ok, Bytes :: non_neg_integer()} | {error, file_err() | term()}. +%% Synchronizes the snapshot to disk. +-callback sync(Location :: file:filename()) -> + ok | + {error, file_err() | term()}. + %% Read the snapshot metadata and initialise a read state used in read_chunk/1 %% The read state should contain all the information required to read a chunk @@ -305,6 +312,9 @@ begin_snapshot(#{index := Idx, term := Term} = Meta, MacRef, SnapKind, checkpoint -> {?C_RA_LOG_CHECKPOINT_BYTES_WRITTEN, CheckpointDir} end, + %% Snapshots must be fsync'd but checkpoints are OK to not sync. + %% Checkpoints are fsync'd before promotion instead. + Sync = SnapKind =:= snapshot, %% create directory for this snapshot SnapDir = make_snapshot_dir(Dir, Idx, Term), %% call prepare then write_snapshot @@ -315,7 +325,7 @@ begin_snapshot(#{index := Idx, term := Term} = Meta, MacRef, SnapKind, Self = self(), Pid = spawn(fun () -> ok = ra_lib:make_dir(SnapDir), - case Mod:write(SnapDir, Meta, Ref) of + case Mod:write(SnapDir, Meta, Ref, Sync) of ok -> ok; {ok, BytesWritten} -> counters_add(Counter, CounterIdx, @@ -335,7 +345,8 @@ begin_snapshot(#{index := Idx, term := Term} = Meta, MacRef, SnapKind, -spec promote_checkpoint(Idx :: ra_index(), State0 :: state()) -> {State :: state(), Effects :: [effect()]}. promote_checkpoint(PromotionIdx, - #?MODULE{snapshot_directory = SnapDir, + #?MODULE{module = Mod, + snapshot_directory = SnapDir, checkpoint_directory = CheckpointDir, checkpoints = Checkpoints0} = State0) -> %% Find the checkpoint with the highest index smaller than or equal to the @@ -347,6 +358,11 @@ promote_checkpoint(PromotionIdx, Snapshot = make_snapshot_dir(SnapDir, Idx, Term), Self = self(), Pid = spawn(fun() -> + %% Checkpoints are created without calling + %% fsync. Snapshots must be fsync'd though, so + %% sync the checkpoint before promoting it + %% into a snapshot. + ok = Mod:sync(Checkpoint), ok = file:rename(Checkpoint, Snapshot), Self ! {ra_log_event, {snapshot_written, diff --git a/test/ra_log_snapshot_SUITE.erl b/test/ra_log_snapshot_SUITE.erl index b7188955..adfd560c 100644 --- a/test/ra_log_snapshot_SUITE.erl +++ b/test/ra_log_snapshot_SUITE.erl @@ -73,7 +73,7 @@ roundtrip(Config) -> Dir = ?config(dir, Config), SnapshotMeta = meta(33, 94, [{banana, node@jungle}, {banana, node@savanna}]), SnapshotRef = my_state, - ok = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef), + ok = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef, true), Context = #{can_accept_full_file => true}, ?assertEqual({SnapshotMeta, SnapshotRef}, read(Dir, Context)), ok. @@ -82,7 +82,7 @@ roundtrip_compat(Config) -> Dir = ?config(dir, Config), SnapshotMeta = meta(33, 94, [{banana, node@jungle}, {banana, node@savanna}]), SnapshotRef = my_state, - ok = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef), + ok = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef, true), ?assertEqual({SnapshotMeta, SnapshotRef}, read(Dir)), ok. @@ -107,7 +107,7 @@ test_accept(Config, Name, DataSize, FullFile, ChunkSize) -> ct:pal("test_accept ~w ~b ~w ~b", [Name, DataSize, FullFile, ChunkSize]), SnapshotMeta = meta(33, 94, [{banana, node@jungle}, {banana, node@savanna}]), SnapshotRef = crypto:strong_rand_bytes(DataSize), - ok = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef), + ok = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef, true), Context = #{can_accept_full_file => FullFile}, {ok, Meta, St} = ra_log_snapshot:begin_read(Dir, Context), %% how to ensure @@ -180,7 +180,7 @@ read_meta_data(Config) -> Dir = ?config(dir, Config), SnapshotMeta = meta(33, 94, [{banana, node@jungle}, {banana, node@savanna}]), SnapshotRef = my_state, - ok = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef), + ok = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef, true), {ok, SnapshotMeta} = ra_log_snapshot:read_meta(Dir), ok. @@ -188,7 +188,7 @@ recover_same_as_read(Config) -> Dir = ?config(dir, Config), SnapshotMeta = meta(33, 94, [{banana, node@jungle}, {banana, node@savanna}]), SnapshotData = my_state, - ok = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotData), + ok = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotData, true), {ok, SnapshotMeta, SnapshotData} = ra_log_snapshot:recover(Dir), ok. From acd5618f4d94e1fdba5333a88e95286cec509c6f Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Mon, 5 Feb 2024 15:03:23 -0500 Subject: [PATCH 14/17] ra_snapshot_SUITE: Add a helper around ra_snapshot:init/5 This is the same helper as I added for `ra_checkpoint_SUITE`. Having the helper makes it a little easier to add new parameters to `ra_snapshot:init/X` (see the child commit). --- test/ra_snapshot_SUITE.erl | 89 ++++++++++---------------------------- 1 file changed, 24 insertions(+), 65 deletions(-) diff --git a/test/ra_snapshot_SUITE.erl b/test/ra_snapshot_SUITE.erl index b41ff89b..b74e2e9f 100644 --- a/test/ra_snapshot_SUITE.erl +++ b/test/ra_snapshot_SUITE.erl @@ -79,10 +79,7 @@ end_per_testcase(_TestCase, _Config) -> init_empty(Config) -> UId = ?config(uid, Config), - State = ra_snapshot:init(UId, ?MODULE, - ?config(snap_dir, Config), - ?config(checkpoint_dir, Config), - undefined), + State = init_state(Config), %% no pending, no current undefined = ra_snapshot:current(State), undefined = ra_snapshot:pending(State), @@ -92,10 +89,7 @@ init_empty(Config) -> take_snapshot(Config) -> UId = ?config(uid, Config), - State0 = ra_snapshot:init(UId, ra_log_snapshot, - ?config(snap_dir, Config), - ?config(checkpoint_dir, Config), - undefined), + State0 = init_state(Config), Meta = meta(55, 2, [node()]), MacRef = ?FUNCTION_NAME, {State1, [{monitor, process, snapshot_writer, Pid}]} = @@ -117,8 +111,7 @@ take_snapshot(Config) -> take_snapshot_crash(Config) -> UId = ?config(uid, Config), SnapDir = ?config(snap_dir, Config), - CPDir = ?config(checkpoint_dir, Config), - State0 = ra_snapshot:init(UId, ra_log_snapshot, SnapDir, CPDir, undefined), + State0 = init_state(Config), Meta = meta(55, 2, [node()]), MacRef = ?FUNCTION_NAME, {State1, [{monitor, process, snapshot_writer, Pid}]} = @@ -147,10 +140,7 @@ take_snapshot_crash(Config) -> init_recover(Config) -> UId = ?config(uid, Config), - State0 = ra_snapshot:init(UId, ra_log_snapshot, - ?config(snap_dir, Config), - ?config(checkpoint_dir, Config), - undefined), + State0 = init_state(Config), Meta = meta(55, 2, [node()]), {State1, [{monitor, process, snapshot_writer, _}]} = ra_snapshot:begin_snapshot(Meta, ?FUNCTION_NAME, snapshot, State0), @@ -163,10 +153,7 @@ init_recover(Config) -> end, %% open a new snapshot state to simulate a restart - Recover = ra_snapshot:init(UId, ra_log_snapshot, - ?config(snap_dir, Config), - ?config(checkpoint_dir, Config), - undefined), + Recover = init_state(Config), %% ensure last snapshot is recovered %% it also needs to be validated as could have crashed mid write undefined = ra_snapshot:pending(Recover), @@ -179,10 +166,7 @@ init_recover(Config) -> init_recover_voter_status(Config) -> UId = ?config(uid, Config), - State0 = ra_snapshot:init(UId, ra_log_snapshot, - ?config(snap_dir, Config), - ?config(checkpoint_dir, Config), - undefined), + State0 = init_state(Config), Meta = meta(55, 2, #{node() => #{voter_status => test}}), {State1, [{monitor, process, snapshot_writer, _}]} = ra_snapshot:begin_snapshot(Meta, ?FUNCTION_NAME, snapshot, State0), @@ -195,10 +179,7 @@ init_recover_voter_status(Config) -> end, %% open a new snapshot state to simulate a restart - Recover = ra_snapshot:init(UId, ra_log_snapshot, - ?config(snap_dir, Config), - ?config(checkpoint_dir, Config), - undefined), + Recover = init_state(Config), %% ensure last snapshot is recovered %% it also needs to be validated as could have crashed mid write undefined = ra_snapshot:pending(Recover), @@ -211,10 +192,7 @@ init_recover_voter_status(Config) -> init_multi(Config) -> UId = ?config(uid, Config), - State0 = ra_snapshot:init(UId, ra_log_snapshot, - ?config(snap_dir, Config), - ?config(checkpoint_dir, Config), - undefined), + State0 = init_state(Config), Meta1 = meta(55, 2, [node()]), Meta2 = meta(165, 2, [node()]), {State1, _} = ra_snapshot:begin_snapshot(Meta1, ?FUNCTION_NAME, snapshot, @@ -239,10 +217,7 @@ init_multi(Config) -> end, %% open a new snapshot state to simulate a restart - Recover = ra_snapshot:init(UId, ra_log_snapshot, - ?config(snap_dir, Config), - ?config(checkpoint_dir, Config), - undefined), + Recover = init_state(Config), %% ensure last snapshot is recovered %% it also needs to be validated as could have crashed mid write undefined = ra_snapshot:pending(Recover), @@ -256,8 +231,7 @@ init_multi(Config) -> init_recover_multi_corrupt(Config) -> UId = ?config(uid, Config), SnapsDir = ?config(snap_dir, Config), - CPDir = ?config(checkpoint_dir, Config), - State0 = ra_snapshot:init(UId, ra_log_snapshot, SnapsDir, CPDir, undefined), + State0 = init_state(Config), Meta1 = meta(55, 2, [node()]), Meta2 = meta(165, 2, [node()]), {State1, _} = ra_snapshot:begin_snapshot(Meta1, ?FUNCTION_NAME, snapshot, @@ -286,10 +260,7 @@ init_recover_multi_corrupt(Config) -> ok = file:delete(filename:join(Corrupt, "snapshot.dat")), %% open a new snapshot state to simulate a restart - Recover = ra_snapshot:init(UId, ra_log_snapshot, - ?config(snap_dir, Config), - ?config(checkpoint_dir, Config), - undefined), + Recover = init_state(Config), %% ensure last snapshot is recovered %% it also needs to be validated as could have crashed mid write undefined = ra_snapshot:pending(Recover), @@ -307,8 +278,7 @@ init_recover_corrupt(Config) -> UId = ?config(uid, Config), Meta = meta(55, 2, [node()]), SnapsDir = ?config(snap_dir, Config), - CPDir = ?config(checkpoint_dir, Config), - State0 = ra_snapshot:init(UId, ra_log_snapshot, SnapsDir, CPDir, undefined), + State0 = init_state(Config), {State1, _} = ra_snapshot:begin_snapshot(Meta, ?FUNCTION_NAME, snapshot, State0), _ = receive @@ -326,10 +296,7 @@ init_recover_corrupt(Config) -> %% clear out ets table ets:delete_all_objects(ra_log_snapshot_state), %% open a new snapshot state to simulate a restart - Recover = ra_snapshot:init(UId, ra_log_snapshot, - ?config(snap_dir, Config), - ?config(checkpoint_dir, Config), - undefined), + Recover = init_state(Config), %% ensure the corrupt snapshot isn't recovered undefined = ra_snapshot:pending(Recover), undefined = ra_snapshot:current(Recover), @@ -339,11 +306,7 @@ init_recover_corrupt(Config) -> ok. read_snapshot(Config) -> - UId = ?config(uid, Config), - State0 = ra_snapshot:init(UId, ra_log_snapshot, - ?config(snap_dir, Config), - ?config(checkpoint_dir, Config), - undefined), + State0 = init_state(Config), Meta = meta(55, 2, [node()]), MacRef = crypto:strong_rand_bytes(1024 * 4), {State1, _} = @@ -373,10 +336,7 @@ read_all_chunks(ChunkState, State, Size, Acc) -> accept_snapshot(Config) -> UId = ?config(uid, Config), - State0 = ra_snapshot:init(UId, ra_log_snapshot, - ?config(snap_dir, Config), - ?config(checkpoint_dir, Config), - undefined), + State0 = init_state(Config), Meta = meta(55, 2, [node()]), MetaBin = term_to_binary(Meta), MacRef = crypto:strong_rand_bytes(1024 * 4), @@ -408,10 +368,7 @@ accept_snapshot(Config) -> abort_accept(Config) -> UId = ?config(uid, Config), - State0 = ra_snapshot:init(UId, ra_log_snapshot, - ?config(snap_dir, Config), - ?config(checkpoint_dir, Config), - undefined), + State0 = init_state(Config), Meta = meta(55, 2, [node()]), MacRef = crypto:strong_rand_bytes(1024 * 4), MacBin = term_to_binary(MacRef), @@ -436,9 +393,7 @@ abort_accept(Config) -> accept_receives_snapshot_written_with_lower_index(Config) -> UId = ?config(uid, Config), - SnapDir = ?config(snap_dir, Config), - CPDir = ?config(checkpoint_dir, Config), - State0 = ra_snapshot:init(UId, ra_log_snapshot, SnapDir, CPDir, undefined), + State0 = init_state(Config), MetaLocal = meta(55, 2, [node()]), MetaRemote = meta(165, 2, [node()]), MetaRemoteBin = term_to_binary(MetaRemote), @@ -478,9 +433,7 @@ accept_receives_snapshot_written_with_lower_index(Config) -> accept_receives_snapshot_written_with_higher_index(Config) -> UId = ?config(uid, Config), - SnapDir = ?config(snap_dir, Config), - CPDir = ?config(checkpoint_dir, Config), - State0 = ra_snapshot:init(UId, ra_log_snapshot, SnapDir, CPDir, undefined), + State0 = init_state(Config), MetaRemote = meta(55, 2, [node()]), MetaLocal = meta(165, 2, [node()]), %% begin a local snapshot @@ -515,6 +468,12 @@ accept_receives_snapshot_written_with_higher_index(Config) -> end, ok. +init_state(Config) -> + ra_snapshot:init(?config(uid, Config), ra_log_snapshot, + ?config(snap_dir, Config), + ?config(checkpoint_dir, Config), + undefined). + meta(Idx, Term, Cluster) -> #{index => Idx, term => Term, From 47e073395d18ac31319f27fcc586d695dba4f7ce Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Mon, 5 Feb 2024 15:05:29 -0500 Subject: [PATCH 15/17] Allow configuring the maximum number of checkpoints --- src/ra.hrl | 2 ++ src/ra_log.erl | 9 ++++++--- src/ra_snapshot.erl | 22 +++++++++++----------- test/ra_checkpoint_SUITE.erl | 6 ++++-- test/ra_log_2_SUITE.erl | 2 +- test/ra_snapshot_SUITE.erl | 6 ++++-- 6 files changed, 28 insertions(+), 19 deletions(-) diff --git a/src/ra.hrl b/src/ra.hrl index 8d8d16be..48402617 100644 --- a/src/ra.hrl +++ b/src/ra.hrl @@ -230,6 +230,8 @@ -define(DEFAULT_SNAPSHOT_MODULE, ra_log_snapshot). +-define(DEFAULT_MAX_CHECKPOINTS, 10). + -define(RA_LOG_COUNTER_FIELDS, [{write_ops, ?C_RA_LOG_WRITE_OPS, counter, "Total number of write ops"}, diff --git a/src/ra_log.erl b/src/ra_log.erl index 7b5ec77b..2142ac43 100644 --- a/src/ra_log.erl +++ b/src/ra_log.erl @@ -136,10 +136,11 @@ pre_init(#{uid := UId, system_config := #{data_dir := DataDir}} = Conf) -> Dir = server_data_dir(DataDir, UId), SnapModule = maps:get(snapshot_module, Conf, ?DEFAULT_SNAPSHOT_MODULE), + MaxCheckpoints = maps:get(max_checkpoints, Conf, ?DEFAULT_MAX_CHECKPOINTS), SnapshotsDir = filename:join(Dir, "snapshots"), CheckpointsDir = filename:join(Dir, "checkpoints"), _ = ra_snapshot:init(UId, SnapModule, SnapshotsDir, - CheckpointsDir, undefined), + CheckpointsDir, undefined, MaxCheckpoints), ok. -spec init(ra_log_init_args()) -> state(). @@ -155,7 +156,9 @@ init(#{uid := UId, LogId = maps:get(log_id, Conf, UId), ResendWindow = maps:get(resend_window, Conf, ?DEFAULT_RESEND_WINDOW_SEC), SnapInterval = maps:get(snapshot_interval, Conf, ?SNAPSHOT_INTERVAL), - CPInterval = maps:get(checkpoint_interval, Conf, ?CHECKPOINT_INTERVAL), + CPInterval = maps:get(min_checkpoint_interval, Conf, + ?MIN_CHECKPOINT_INTERVAL), + MaxCheckpoints = maps:get(max_checkpoints, Conf, ?DEFAULT_MAX_CHECKPOINTS), SnapshotsDir = filename:join(Dir, "snapshots"), CheckpointsDir = filename:join(Dir, "checkpoints"), Counter = maps:get(counter, Conf, undefined), @@ -167,7 +170,7 @@ init(#{uid := UId, % initialise metrics for this server true = ets:insert(ra_log_metrics, {UId, 0, 0, 0, 0}), SnapshotState = ra_snapshot:init(UId, SnapModule, SnapshotsDir, - CheckpointsDir, Counter), + CheckpointsDir, Counter, MaxCheckpoints), {SnapIdx, SnapTerm} = case ra_snapshot:current(SnapshotState) of undefined -> {-1, -1}; Curr -> Curr diff --git a/src/ra_snapshot.erl b/src/ra_snapshot.erl index 0cf80c2f..98d4ae08 100644 --- a/src/ra_snapshot.erl +++ b/src/ra_snapshot.erl @@ -20,7 +20,7 @@ read_chunk/3, delete/2, - init/5, + init/6, init_ets/0, current/1, pending/1, @@ -82,13 +82,11 @@ pending :: option({pid(), ra_idxterm(), kind()}), accepting :: option(#accept{}), current :: option(ra_idxterm()), - checkpoints = [] :: list(checkpoint())}). + checkpoints = [] :: list(checkpoint()), + max_checkpoints :: pos_integer()}). -define(ETSTBL, ra_log_snapshot_state). -%% TODO: Make this constant configurable? --define(MAX_CHECKPOINTS, 10). - -opaque state() :: #?MODULE{}. -export_type([state/0]). @@ -170,14 +168,15 @@ -callback context() -> map(). -spec init(ra_uid(), module(), file:filename(), file:filename(), - undefined | counters:counters_ref()) -> + undefined | counters:counters_ref(), pos_integer()) -> state(). -init(UId, Module, SnapshotsDir, CheckpointDir, Counter) -> +init(UId, Module, SnapshotsDir, CheckpointDir, Counter, MaxCheckpoints) -> State = #?MODULE{uid = UId, counter = Counter, module = Module, snapshot_directory = SnapshotsDir, - checkpoint_directory = CheckpointDir}, + checkpoint_directory = CheckpointDir, + max_checkpoints = MaxCheckpoints}, State1 = find_snapshots(State), find_checkpoints(State1). @@ -576,13 +575,14 @@ take_older_checkpoints(Idx, #?MODULE{checkpoints = Checkpoints0} = State0) -> -spec take_extra_checkpoints(state()) -> {state(), [checkpoint()]}. -take_extra_checkpoints(#?MODULE{checkpoints = Checkpoints0} = State0) -> +take_extra_checkpoints(#?MODULE{checkpoints = Checkpoints0, + max_checkpoints = MaxCheckpoints} = State0) -> Len = erlang:length(Checkpoints0), - case Len - ?MAX_CHECKPOINTS of + case Len - MaxCheckpoints of ToDelete when ToDelete > 0 -> %% Take `ToDelete' checkpoints from the list randomly without %% ever taking the first or last checkpoint. - IdxsToTake = random_idxs_to_take(?MAX_CHECKPOINTS, ToDelete), + IdxsToTake = random_idxs_to_take(MaxCheckpoints, ToDelete), {Checkpoints, Extras} = lists_take_idxs(Checkpoints0, IdxsToTake), {State0#?MODULE{checkpoints = Checkpoints}, Extras}; _ -> diff --git a/test/ra_checkpoint_SUITE.erl b/test/ra_checkpoint_SUITE.erl index 272ca7e5..bfcdfe28 100644 --- a/test/ra_checkpoint_SUITE.erl +++ b/test/ra_checkpoint_SUITE.erl @@ -11,6 +11,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). +-include("src/ra.hrl"). %%%=================================================================== %%% Common Test callbacks @@ -61,7 +62,8 @@ init_per_testcase(TestCase, Config) -> ok = ra_lib:make_dir(CheckpointDir), [{uid, ra_lib:to_binary(TestCase)}, {snap_dir, SnapDir}, - {checkpoint_dir, CheckpointDir} | Config]. + {checkpoint_dir, CheckpointDir}, + {max_checkpoints, ?DEFAULT_MAX_CHECKPOINTS} | Config]. end_per_testcase(_TestCase, _Config) -> ok. @@ -338,7 +340,7 @@ init_state(Config) -> ra_log_snapshot, ?config(snap_dir, Config), ?config(checkpoint_dir, Config), - undefined). + undefined, ?config(max_checkpoints, Config)). meta(Idx, Term, Cluster) -> #{index => Idx, diff --git a/test/ra_log_2_SUITE.erl b/test/ra_log_2_SUITE.erl index bd0afce1..9a8cdd89 100644 --- a/test/ra_log_2_SUITE.erl +++ b/test/ra_log_2_SUITE.erl @@ -1370,7 +1370,7 @@ create_snapshot_chunk(Config, #{index := Idx} = Meta, Context) -> ok = ra_lib:make_dir(OthDir), ok = ra_lib:make_dir(CPDir), Sn0 = ra_snapshot:init(<<"someotheruid_adsfasdf">>, ra_log_snapshot, - OthDir, CPDir, undefined), + OthDir, CPDir, undefined, ?DEFAULT_MAX_CHECKPOINTS), MacRef = <<"9">>, {Sn1, _} = ra_snapshot:begin_snapshot(Meta, MacRef, snapshot, Sn0), Sn2 = diff --git a/test/ra_snapshot_SUITE.erl b/test/ra_snapshot_SUITE.erl index b74e2e9f..cf078b5b 100644 --- a/test/ra_snapshot_SUITE.erl +++ b/test/ra_snapshot_SUITE.erl @@ -14,6 +14,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). +-include("src/ra.hrl"). %%%=================================================================== %%% Common Test callbacks @@ -68,7 +69,8 @@ init_per_testcase(TestCase, Config) -> ok = ra_lib:make_dir(CheckpointDir), [{uid, ra_lib:to_binary(TestCase)}, {snap_dir, SnapDir}, - {checkpoint_dir, CheckpointDir} | Config]. + {checkpoint_dir, CheckpointDir}, + {max_checkpoints, ?DEFAULT_MAX_CHECKPOINTS} | Config]. end_per_testcase(_TestCase, _Config) -> ok. @@ -472,7 +474,7 @@ init_state(Config) -> ra_snapshot:init(?config(uid, Config), ra_log_snapshot, ?config(snap_dir, Config), ?config(checkpoint_dir, Config), - undefined). + undefined, ?config(max_checkpoints, Config)). meta(Idx, Term, Cluster) -> #{index => Idx, From 336bdedfb841d67d13b49fb76f20de5a3f6c2154 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Tue, 13 Feb 2024 09:30:08 -0500 Subject: [PATCH 16/17] Add latest_checkpoint_index to ra_log overview map --- src/ra_log.erl | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/ra_log.erl b/src/ra_log.erl index 2142ac43..f7a85e41 100644 --- a/src/ra_log.erl +++ b/src/ra_log.erl @@ -796,6 +796,11 @@ overview(#?MODULE{last_index = LastIndex, undefined -> undefined; {I, _} -> I end, + latest_checkpoint_index => + case ra_snapshot:latest_checkpoint(SnapshotState) of + undefined -> undefined; + {I, _} -> I + end, cache_size => ra_log_cache:size(Cache) }. From d9890ccca4c2a7a8efae1b623536ca815dfdd0a6 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Tue, 13 Feb 2024 09:30:27 -0500 Subject: [PATCH 17/17] Add an integration test for checkpoints and promotion --- test/coordination_SUITE.erl | 96 +++++++++++++++++++++++++++++++++++-- 1 file changed, 92 insertions(+), 4 deletions(-) diff --git a/test/coordination_SUITE.erl b/test/coordination_SUITE.erl index 269aef8b..4a98a6ac 100644 --- a/test/coordination_SUITE.erl +++ b/test/coordination_SUITE.erl @@ -47,7 +47,8 @@ all_tests() -> leaderboard, bench, disconnected_node_catches_up, - key_metrics + key_metrics, + recover_from_checkpoint ]. groups() -> @@ -708,6 +709,86 @@ bench(Config) -> ra_lib:recursive_delete(PrivDir), ok. +recover_from_checkpoint(Config) -> + PrivDir = ?config(data_dir, Config), + ClusterName = ?config(cluster_name, Config), + ServerNames = [s1, s2, s3], + ServerIds = [{ClusterName, start_follower(N, PrivDir)} || N <- ServerNames], + Configs = [begin + UId = atom_to_binary(Name, utf8), + #{cluster_name => ClusterName, + id => NodeId, + uid => UId, + initial_members => ServerIds, + machine => {module, ?MODULE, #{}}, + log_init_args => #{uid => UId, + min_checkpoint_interval => 3, + snapshot_interval => 5}} + end || {Name, _Node} = NodeId <- ServerIds], + {ok, Started, []} = ra:start_cluster(?SYS, Configs), + {ok, _, Leader} = ra:members(hd(Started)), + [Follower1, Follower2] = ServerIds -- [Leader], + + %% Send five commands to trigger a snapshot. + [ok = ra:pipeline_command(Leader, N, no_correlation, normal) + || N <- lists:seq(1, 6)], + await_condition( + fun () -> + {ok, #{log := #{snapshot_index := LeaderIdx}}, _} = + ra:member_overview(Leader), + {ok, #{log := #{snapshot_index := Follower1Idx}}, _} = + ra:member_overview(Follower1), + {ok, #{log := #{snapshot_index := Follower2Idx}}, _} = + ra:member_overview(Follower2), + LeaderIdx =:= 6 andalso Follower1Idx =:= 6 andalso + Follower2Idx =:= 6 + end, 20), + + %% Trigger a checkpoint. + {ok, _, _} = ra:process_command(Leader, checkpoint), + await_condition( + fun () -> + {ok, #{log := #{latest_checkpoint_index := LeaderIdx}}, _} = + ra:member_overview(Leader), + {ok, #{log := #{latest_checkpoint_index := Follower1Idx}}, _} = + ra:member_overview(Follower1), + {ok, #{log := #{latest_checkpoint_index := Follower2Idx}}, _} = + ra:member_overview(Follower2), + LeaderIdx =:= 8 andalso Follower1Idx =:= 8 andalso + Follower2Idx =:= 8 + end, 20), + + %% Restart the servers + [ok = ra:stop_server(?SYS, ServerId) || ServerId <- ServerIds], + [ok = ra:restart_server(?SYS, ServerId) || ServerId <- ServerIds], + + %% All servers should have recovered from their checkpoints since the + %% checkpoint has a higher index than the snapshot. + [{ok, {_CurrentIdx, _CheckpointIdx = 8}, _Leader} = + ra:local_query(ServerId, fun(State) -> + maps:get(checkpoint_index, State, + undefined) + end) || ServerId <- ServerIds], + + %% Promote the checkpoint into a snapshot. + {ok, _, _} = ra:process_command(Leader, promote_checkpoint), + await_condition( + fun () -> + {ok, #{log := #{snapshot_index := LeaderIdx}}, _} = + ra:member_overview(Leader), + {ok, #{log := #{snapshot_index := Follower1Idx}}, _} = + ra:member_overview(Follower1), + {ok, #{log := #{snapshot_index := Follower2Idx}}, _} = + ra:member_overview(Follower2), + LeaderIdx =:= 8 andalso Follower1Idx =:= 8 andalso + Follower2Idx =:= 8 + end, 20), + + [ok = slave:stop(S) || {_, S} <- ServerIds], + ok. + +%% Utility + test_local_msg(Leader, ReceiverNode, ExpectedSenderNode, CmdTag, Opts0) -> Opts = case Opts0 of local -> [local]; @@ -761,8 +842,6 @@ test_local_msg(Leader, ReceiverNode, ExpectedSenderNode, CmdTag, Opts0) -> flush(), ok. -%% Utility - get_current_host() -> NodeStr = atom_to_list(node()), Host = re:replace(NodeStr, "^[^@]+@", "", [{return, list}]), @@ -802,7 +881,7 @@ flush() -> %% ra_machine impl init(_) -> - {#{}, []}. + #{}. apply(_Meta, {send_local_msg, Pid, Opts}, State) -> {State, ok, [{send_msg, Pid, {local_msg, node()}, Opts}]}; @@ -815,6 +894,15 @@ apply(#{index := Idx}, {do_local_log, SenderPid, Opts}, State) -> {State, ok, [Eff]}; apply(#{index := _Idx}, {data, _}, State) -> {State, ok, []}; +apply(#{index := Idx}, checkpoint, State) -> + %% Generally machines should save their state without any modifications + %% but we slightly modify the machine state we save in the checkpoint here + %% so that we can tell when we've recovered from a checkpoint rather than + %% a snapshot. + CheckpointState = maps:put(checkpoint_index, Idx, State), + {State, ok, [{checkpoint, Idx, CheckpointState}]}; +apply(#{index := Idx}, promote_checkpoint, State) -> + {State, ok, [{release_cursor, Idx}]}; apply(#{index := Idx}, _Cmd, State) -> {State, ok, [{release_cursor, Idx, State}]}.