diff --git a/docs/internals/INTERNALS.md b/docs/internals/INTERNALS.md
index b533a6ff..1ca18607 100644
--- a/docs/internals/INTERNALS.md
+++ b/docs/internals/INTERNALS.md
@@ -188,6 +188,27 @@ It is not guaranteed that a snapshot will be taken. A decision to take
a snapshot or to delay it is taken using a number of internal Ra state factors.
The goal is to minimise disk I/O activity when possible.
+### Checkpointing
+
+Checkpoints are nearly the same concept as snapshots. Snapshotting truncates
+the log up to the snapshot's index, which might be undesirable for machines
+which read from the log with the `{log, Indexes, Fun}` effect mentioned above.
+
+The `{checkpoint, RaftIndex, MachineState}` effect can be used as a hint to
+trigger a checkpoint. Like snapshotting, this effect is evaluated on all nodes
+and when a checkpoint is taken, the machine state is saved to disk and can be
+used for recovery when the machine restarts. A checkpoint being written does
+not trigger any log truncation though.
+
+The `{release_cursor, RaftIndex}` effect can then be used to promote any
+existing checkpoint older than or equal to `RaftIndex` into a proper snapshot,
+and any log entries older than the checkpoint's index are then truncated.
+
+These two effects are intended for machines that use the `{log, Indexes, Fun}`
+effect and can substantially improve machine recovery time compared to
+snapshotting alone, especially when the machine needs to keep old log entries
+around for a long time.
+
## State Machine Versioning
It is eventually necessary to make changes to the state machine
diff --git a/docs/internals/STATE_MACHINE_TUTORIAL.md b/docs/internals/STATE_MACHINE_TUTORIAL.md
index b1a25266..08bed484 100644
--- a/docs/internals/STATE_MACHINE_TUTORIAL.md
+++ b/docs/internals/STATE_MACHINE_TUTORIAL.md
@@ -218,3 +218,11 @@ or similar.
To (potentially) trigger a snapshot return the `{release_cursor, RaftIndex, MachineState}`
effect. This is why the raft index is included in the `apply/3` function. Ra will
only create a snapshot if doing so will result in log segments being deleted.
+
+For machines that must keep log segments on disk for some time, the
+`{checkpoint, RaftIndex, MachineState}` effect can be used. This creates a
+snapshot-like view of the machine state on disk but doesn't trigger log
+truncation. Checkpoints can later be promoted to snapshots and trigger log
+truncation by emitting a `{release_cursor, RaftIndex}` effect. The most
+recent checkpoint with an index smaller than or equal to `RaftIndex` will be
+promoted.
diff --git a/src/ra.hrl b/src/ra.hrl
index 246283d0..48402617 100644
--- a/src/ra.hrl
+++ b/src/ra.hrl
@@ -230,6 +230,8 @@
-define(DEFAULT_SNAPSHOT_MODULE, ra_log_snapshot).
+-define(DEFAULT_MAX_CHECKPOINTS, 10).
+
-define(RA_LOG_COUNTER_FIELDS,
[{write_ops, ?C_RA_LOG_WRITE_OPS, counter,
"Total number of write ops"},
@@ -254,6 +256,10 @@
{snapshot_bytes_written, ?C_RA_LOG_SNAPSHOT_BYTES_WRITTEN, counter,
"Number of snapshot bytes written (not installed)"},
{open_segments, ?C_RA_LOG_OPEN_SEGMENTS, gauge, "Number of open segments"},
+ {checkpoints_written, ?C_RA_LOG_CHECKPOINTS_WRITTEN, counter,
+ "Total number of checkpoints written"},
+ {checkpoint_bytes_written, ?C_RA_LOG_CHECKPOINT_BYTES_WRITTEN, counter,
+ "Number of checkpoint bytes written"},
{reserved_1, ?C_RA_LOG_RESERVED, counter, "Reserved counter"}
]).
-define(C_RA_LOG_WRITE_OPS, 1).
@@ -268,7 +274,9 @@
-define(C_RA_LOG_SNAPSHOTS_INSTALLED, 10).
-define(C_RA_LOG_SNAPSHOT_BYTES_WRITTEN, 11).
-define(C_RA_LOG_OPEN_SEGMENTS, 12).
--define(C_RA_LOG_RESERVED, 13).
+-define(C_RA_LOG_CHECKPOINTS_WRITTEN, 13).
+-define(C_RA_LOG_CHECKPOINT_BYTES_WRITTEN, 14).
+-define(C_RA_LOG_RESERVED, 15).
-define(C_RA_SRV_AER_RECEIVED_FOLLOWER, ?C_RA_LOG_RESERVED + 1).
-define(C_RA_SRV_AER_REPLIES_SUCCESS, ?C_RA_LOG_RESERVED + 2).
@@ -290,7 +298,8 @@
-define(C_RA_SRV_TERM_AND_VOTED_FOR_UPDATES, ?C_RA_LOG_RESERVED + 18).
-define(C_RA_SRV_LOCAL_QUERIES, ?C_RA_LOG_RESERVED + 19).
-define(C_RA_SRV_INVALID_REPLY_MODE_COMMANDS, ?C_RA_LOG_RESERVED + 20).
--define(C_RA_SRV_RESERVED, ?C_RA_LOG_RESERVED + 21).
+-define(C_RA_SRV_CHECKPOINTS, ?C_RA_LOG_RESERVED + 21).
+-define(C_RA_SRV_RESERVED, ?C_RA_LOG_RESERVED + 22).
-define(RA_SRV_COUNTER_FIELDS,
@@ -335,6 +344,8 @@
"Total number of local queries"},
{invalid_reply_mode_commands, ?C_RA_SRV_INVALID_REPLY_MODE_COMMANDS, counter,
"Total number of commands received with an invalid reply-mode"},
+ {checkpoints, ?C_RA_SRV_CHECKPOINTS, counter,
+ "The number of checkpoint effects executed"},
{reserved_2, ?C_RA_SRV_RESERVED, counter, "Reserved counter"}
]).
@@ -345,6 +356,7 @@
-define(C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, ?C_RA_SRV_RESERVED + 5).
-define(C_RA_SVR_METRIC_COMMIT_LATENCY, ?C_RA_SRV_RESERVED + 6).
-define(C_RA_SVR_METRIC_TERM, ?C_RA_SRV_RESERVED + 7).
+-define(C_RA_SVR_METRIC_CHECKPOINT_INDEX, ?C_RA_SRV_RESERVED + 8).
-define(RA_SRV_METRICS_COUNTER_FIELDS,
[
@@ -360,7 +372,9 @@
"The last fully written and fsynced index of the log."},
{commit_latency, ?C_RA_SVR_METRIC_COMMIT_LATENCY, gauge,
"Approximate time taken from an entry being written to the log until it is committed."},
- {term, ?C_RA_SVR_METRIC_TERM, counter, "The current term."}
+ {term, ?C_RA_SVR_METRIC_TERM, counter, "The current term."},
+ {checkpoint_index, ?C_RA_SVR_METRIC_CHECKPOINT_INDEX, counter,
+ "The current checkpoint index."}
]).
-define(RA_COUNTER_FIELDS,
diff --git a/src/ra_lib.erl b/src/ra_lib.erl
index 41995b73..9a7e72f0 100644
--- a/src/ra_lib.erl
+++ b/src/ra_lib.erl
@@ -39,8 +39,11 @@
retry/2,
retry/3,
write_file/2,
+ write_file/3,
+ sync_file/1,
lists_chunk/2,
lists_detect_sort/1,
+ lists_shuffle/1,
is_dir/1,
is_file/1,
ensure_dir/1,
@@ -49,6 +52,10 @@
maps_merge_with/3
]).
+-type file_err() :: file:posix() | badarg | terminated | system_limit.
+
+-export_type([file_err/0]).
+
-include_lib("kernel/include/file.hrl").
ceiling(X) when X < 0 ->
@@ -313,18 +320,23 @@ retry(Func, Attempt, Sleep) ->
retry(Func, Attempt - 1)
end.
-
+-spec write_file(file:name_all(), iodata()) ->
+ ok | file_err().
write_file(Name, IOData) ->
+ write_file(Name, IOData, true).
+
+-spec write_file(file:name_all(), iodata(), Sync :: boolean()) ->
+ ok | file_err().
+write_file(Name, IOData, Sync) ->
case file:open(Name, [binary, write, raw]) of
{ok, Fd} ->
case file:write(Fd, IOData) of
ok ->
- case file:sync(Fd) of
- ok ->
- file:close(Fd);
- Err ->
- _ = file:close(Fd),
- Err
+ case Sync of
+ true ->
+ sync_and_close_fd(Fd);
+ false ->
+ ok
end;
Err ->
_ = file:close(Fd),
@@ -334,6 +346,27 @@ write_file(Name, IOData) ->
Err
end.
+-spec sync_file(file:name_all()) ->
+ ok | file_err().
+sync_file(Name) ->
+ case file:open(Name, [binary, write, raw]) of
+ {ok, Fd} ->
+ sync_and_close_fd(Fd);
+ Err ->
+ Err
+ end.
+
+-spec sync_and_close_fd(file:fd()) ->
+ ok | file_err().
+sync_and_close_fd(Fd) ->
+ case file:sync(Fd) of
+ ok ->
+ file:close(Fd);
+ Err ->
+ _ = file:close(Fd),
+ Err
+ end.
+
lists_chunk(0, List) ->
error(invalid_size, [0, List]);
lists_chunk(Size, List) ->
@@ -382,6 +415,12 @@ do_ascending(A, [B | Rem])
do_ascending(_A, _) ->
unsorted.
+%% Reorder a list randomly.
+-spec lists_shuffle(list()) -> list().
+lists_shuffle(List0) ->
+ List1 = [{rand:uniform(), Elem} || Elem <- List0],
+ [Elem || {_, Elem} <- lists:keysort(1, List1)].
+
is_dir(Dir) ->
case prim_file:read_file_info(Dir) of
{ok, #file_info{type=directory}} ->
diff --git a/src/ra_log.erl b/src/ra_log.erl
index da712c4c..f7a85e41 100644
--- a/src/ra_log.erl
+++ b/src/ra_log.erl
@@ -31,6 +31,8 @@
recover_snapshot/1,
snapshot_index_term/1,
update_release_cursor/5,
+ checkpoint/5,
+ promote_checkpoint/2,
needs_cache_flush/1,
can_write/1,
@@ -52,6 +54,7 @@
-define(DEFAULT_RESEND_WINDOW_SEC, 20).
-define(SNAPSHOT_INTERVAL, 4096).
+-define(MIN_CHECKPOINT_INTERVAL, 16384).
-define(LOG_APPEND_TIMEOUT, 5000).
-type ra_meta_key() :: atom().
@@ -62,7 +65,7 @@
ToTerm :: ra_term()}} |
{segments, ets:tid(), [segment_ref()]} |
{resend_write, ra_index()} |
- {snapshot_written, ra_idxterm()} |
+ {snapshot_written, ra_idxterm(), ra_snapshot:kind()} |
{down, pid(), term()}.
-type event() :: {ra_log_event, event_body()}.
@@ -82,6 +85,7 @@
log_id :: unicode:chardata(),
directory :: file:filename(),
snapshot_interval = ?SNAPSHOT_INTERVAL :: non_neg_integer(),
+ min_checkpoint_interval = ?MIN_CHECKPOINT_INTERVAL :: non_neg_integer(),
snapshot_module :: module(),
resend_window_seconds = ?DEFAULT_RESEND_WINDOW_SEC :: integer(),
wal :: atom(),
@@ -112,6 +116,7 @@
system_config => ra_system:config(),
log_id => unicode:chardata(),
snapshot_interval => non_neg_integer(),
+ min_checkpoint_interval => non_neg_integer(),
resend_window => integer(),
max_open_segments => non_neg_integer(),
snapshot_module => module(),
@@ -131,8 +136,11 @@ pre_init(#{uid := UId,
system_config := #{data_dir := DataDir}} = Conf) ->
Dir = server_data_dir(DataDir, UId),
SnapModule = maps:get(snapshot_module, Conf, ?DEFAULT_SNAPSHOT_MODULE),
+ MaxCheckpoints = maps:get(max_checkpoints, Conf, ?DEFAULT_MAX_CHECKPOINTS),
SnapshotsDir = filename:join(Dir, "snapshots"),
- _ = ra_snapshot:init(UId, SnapModule, SnapshotsDir, undefined),
+ CheckpointsDir = filename:join(Dir, "checkpoints"),
+ _ = ra_snapshot:init(UId, SnapModule, SnapshotsDir,
+ CheckpointsDir, undefined, MaxCheckpoints),
ok.
-spec init(ra_log_init_args()) -> state().
@@ -148,15 +156,21 @@ init(#{uid := UId,
LogId = maps:get(log_id, Conf, UId),
ResendWindow = maps:get(resend_window, Conf, ?DEFAULT_RESEND_WINDOW_SEC),
SnapInterval = maps:get(snapshot_interval, Conf, ?SNAPSHOT_INTERVAL),
+ CPInterval = maps:get(min_checkpoint_interval, Conf,
+ ?MIN_CHECKPOINT_INTERVAL),
+ MaxCheckpoints = maps:get(max_checkpoints, Conf, ?DEFAULT_MAX_CHECKPOINTS),
SnapshotsDir = filename:join(Dir, "snapshots"),
+ CheckpointsDir = filename:join(Dir, "checkpoints"),
Counter = maps:get(counter, Conf, undefined),
%% ensure directories are there
ok = ra_lib:make_dir(Dir),
ok = ra_lib:make_dir(SnapshotsDir),
+ ok = ra_lib:make_dir(CheckpointsDir),
% initialise metrics for this server
true = ets:insert(ra_log_metrics, {UId, 0, 0, 0, 0}),
- SnapshotState = ra_snapshot:init(UId, SnapModule, SnapshotsDir, Counter),
+ SnapshotState = ra_snapshot:init(UId, SnapModule, SnapshotsDir,
+ CheckpointsDir, Counter, MaxCheckpoints),
{SnapIdx, SnapTerm} = case ra_snapshot:current(SnapshotState) of
undefined -> {-1, -1};
Curr -> Curr
@@ -188,6 +202,7 @@ init(#{uid := UId,
uid = UId,
log_id = LogId,
snapshot_interval = SnapInterval,
+ min_checkpoint_interval = CPInterval,
wal = Wal,
segment_writer = SegWriter,
resend_window_seconds = ResendWindow,
@@ -499,37 +514,55 @@ handle_event({segments, Tid, NewSegs},
end),
{State, log_update_effects(Readers, Pid, State)}
end;
-handle_event({snapshot_written, {SnapIdx, _} = Snap},
+handle_event({snapshot_written, {SnapIdx, _} = Snap, SnapKind},
#?MODULE{cfg = Cfg,
first_index = FstIdx,
snapshot_state = SnapState0} = State0)
%% only update snapshot if it is newer than the last snapshot
when SnapIdx >= FstIdx ->
- % delete any segments outside of first_index
- {State, Effects0} = delete_segments(SnapIdx, State0),
- SnapState = ra_snapshot:complete_snapshot(Snap, SnapState0),
- put_counter(Cfg, ?C_RA_SVR_METRIC_SNAPSHOT_INDEX, SnapIdx),
- %% delete old snapshot files
- %% This is done as an effect
- %% so that if an old snapshot is still being replicated
- %% the cleanup can be delayed until it is safe
- Effects = [{delete_snapshot,
- ra_snapshot:directory(SnapState),
- ra_snapshot:current(SnapState0)} | Effects0],
- %% do not set last written index here as the snapshot may
- %% be for a past index
- {State#?MODULE{first_index = SnapIdx + 1,
- snapshot_state = SnapState}, Effects};
-handle_event({snapshot_written, {Idx, Term} = Snap},
+ SnapState1 = ra_snapshot:complete_snapshot(Snap, SnapKind, SnapState0),
+ case SnapKind of
+ snapshot ->
+ put_counter(Cfg, ?C_RA_SVR_METRIC_SNAPSHOT_INDEX, SnapIdx),
+ % delete any segments outside of first_index
+ {State, Effects0} = delete_segments(SnapIdx, State0),
+ %% Delete old snapshot files. This is done as an effect
+ %% so that if an old snapshot is still being replicated
+ %% the cleanup can be delayed until it is safe.
+ DeleteCurrentSnap = {delete_snapshot,
+ ra_snapshot:directory(SnapState1, snapshot),
+ ra_snapshot:current(SnapState0)},
+ %% Also delete any checkpoints older than this snapshot.
+ {SnapState, Checkpoints} =
+ ra_snapshot:take_older_checkpoints(SnapIdx, SnapState1),
+ CPEffects = [{delete_snapshot,
+ ra_snapshot:directory(SnapState, checkpoint),
+ Checkpoint} || Checkpoint <- Checkpoints],
+ Effects = [DeleteCurrentSnap | CPEffects] ++ Effects0,
+ %% do not set last written index here as the snapshot may
+ %% be for a past index
+ {State#?MODULE{first_index = SnapIdx + 1,
+ snapshot_state = SnapState}, Effects};
+ checkpoint ->
+ put_counter(Cfg, ?C_RA_SVR_METRIC_CHECKPOINT_INDEX, SnapIdx),
+ %% If we already have the maximum allowed number of checkpoints,
+ %% remove some checkpoints to make space.
+ {SnapState, CPs} = ra_snapshot:take_extra_checkpoints(SnapState1),
+ Effects = [{delete_snapshot,
+ ra_snapshot:directory(SnapState, SnapKind),
+ CP} || CP <- CPs],
+ {State0#?MODULE{snapshot_state = SnapState}, Effects}
+ end;
+handle_event({snapshot_written, {Idx, Term} = Snap, SnapKind},
#?MODULE{cfg =#cfg{log_id = LogId},
snapshot_state = SnapState} = State0) ->
- %% if the snapshot is stale we just want to delete it
+ %% if the snapshot/checkpoint is stale we just want to delete it
Current = ra_snapshot:current(SnapState),
?INFO("~ts: old snapshot_written received for index ~b in term ~b
- current snapshot ~w, deleting old snapshot",
- [LogId, Idx, Term, Current]),
+ current snapshot ~w, deleting old ~s",
+ [LogId, Idx, Term, Current, SnapKind]),
Effects = [{delete_snapshot,
- ra_snapshot:directory(SnapState),
+ ra_snapshot:directory(SnapState, SnapKind),
Snap}],
{State0, Effects};
handle_event({resend_write, Idx}, State) ->
@@ -615,16 +648,41 @@ snapshot_index_term(#?MODULE{snapshot_state = SS}) ->
MacVersion :: ra_machine:version(),
MacState :: term(), State :: state()) ->
{state(), effects()}.
-update_release_cursor(Idx, Cluster, MacVersion, MacState,
- #?MODULE{snapshot_state = SnapState} = State) ->
- case ra_snapshot:pending(SnapState) of
+update_release_cursor(Idx, Cluster, MacVersion, MacState, State) ->
+ suggest_snapshot(snapshot, Idx, Cluster, MacVersion, MacState, State).
+
+-spec checkpoint(Idx :: ra_index(), Cluster :: ra_cluster(),
+ MacVersion :: ra_machine:version(),
+ MacState :: term(), State :: state()) ->
+ {state(), effects()}.
+checkpoint(Idx, Cluster, MacVersion, MacState, State) ->
+ suggest_snapshot(checkpoint, Idx, Cluster, MacVersion, MacState, State).
+
+suggest_snapshot(SnapKind, Idx, Cluster, MacVersion, MacState,
+ #?MODULE{snapshot_state = SnapshotState} = State) ->
+ case ra_snapshot:pending(SnapshotState) of
undefined ->
- update_release_cursor0(Idx, Cluster, MacVersion, MacState, State);
+ suggest_snapshot0(SnapKind, Idx, Cluster, MacVersion, MacState, State);
_ ->
- % if a snapshot is in progress don't even evaluate
+ %% Only one snapshot or checkpoint may be written at a time to
+ %% prevent excessive I/O usage.
{State, []}
end.
+promote_checkpoint(Idx, #?MODULE{cfg = Cfg,
+ snapshot_state = SnapState0} = State) ->
+ case ra_snapshot:pending(SnapState0) of
+ {_WriterPid, _IdxTerm, snapshot} ->
+ %% If we're currently writing a snapshot, skip promoting a
+ %% checkpoint.
+ {State, []};
+ _ ->
+ ok = incr_counter(Cfg, ?C_RA_LOG_SNAPSHOTS_WRITTEN, 1),
+ {SnapState, Effects} = ra_snapshot:promote_checkpoint(Idx,
+ SnapState0),
+ {State#?MODULE{snapshot_state = SnapState}, Effects}
+ end.
+
-spec flush_cache(state()) -> state().
flush_cache(#?MODULE{cache = Cache} = State) ->
State#?MODULE{cache = ra_log_cache:flush(Cache)}.
@@ -633,28 +691,15 @@ flush_cache(#?MODULE{cache = Cache} = State) ->
needs_cache_flush(#?MODULE{cache = Cache}) ->
ra_log_cache:needs_flush(Cache).
-update_release_cursor0(Idx, Cluster, MacVersion, MacState,
- #?MODULE{cfg = #cfg{snapshot_interval = SnapInter},
- reader = Reader,
- snapshot_state = SnapState} = State0) ->
+suggest_snapshot0(SnapKind, Idx, Cluster, MacVersion, MacState, State0) ->
ClusterServerIds = maps:map(fun (_, V) ->
maps:with([voter_status], V)
end, Cluster),
- SnapLimit = case ra_snapshot:current(SnapState) of
- undefined -> SnapInter;
- {I, _} -> I + SnapInter
- end,
Meta = #{index => Idx,
cluster => ClusterServerIds,
machine_version => MacVersion},
- % The release cursor index is the last entry _not_ contributing
- % to the current state. I.e. the last entry that can be discarded.
- % Check here if any segments can be release.
- case lists:any(fun({_, To, _}) -> To =< Idx end,
- ra_log_reader:segment_refs(Reader)) of
+ case should_snapshot(SnapKind, Idx, State0) of
true ->
- % segments can be cleared up
- % take a snapshot at the release_cursor
% TODO: here we use the current cluster configuration in
% the snapshot,
% _not_ the configuration at the snapshot point.
@@ -668,23 +713,40 @@ update_release_cursor0(Idx, Cluster, MacVersion, MacState,
% or a reference for external storage (e.g. ETS table)
case fetch_term(Idx, State0) of
{undefined, _} ->
- exit({term_not_found_for_index, Idx});
+ {State0, []};
{Term, State} ->
- write_snapshot(Meta#{term => Term}, MacState, State)
- end;
- false when Idx > SnapLimit ->
- %% periodically take snapshots even if segments cannot be cleared
- %% up
- case fetch_term(Idx, State0) of
- {undefined, State} ->
- {State, []};
- {Term, State} ->
- write_snapshot(Meta#{term => Term}, MacState, State)
+ write_snapshot(Meta#{term => Term}, MacState,
+ SnapKind, State)
end;
false ->
{State0, []}
end.
+should_snapshot(snapshot, Idx,
+ #?MODULE{cfg = #cfg{snapshot_interval = SnapInter},
+ reader = Reader,
+ snapshot_state = SnapState}) ->
+ SnapLimit = case ra_snapshot:current(SnapState) of
+ undefined -> SnapInter;
+ {I, _} -> I + SnapInter
+ end,
+ % The release cursor index is the last entry _not_ contributing
+ % to the current state. I.e. the last entry that can be discarded.
+ % We should take a snapshot if the new snapshot index would allow us
+ % to discard any segments or if the we've handled enough commands
+ % since the last snapshot.
+ CanFreeSegments = lists:any(fun({_, To, _}) -> To =< Idx end,
+ ra_log_reader:segment_refs(Reader)),
+ CanFreeSegments orelse Idx > SnapLimit;
+should_snapshot(checkpoint, Idx,
+ #?MODULE{cfg = #cfg{min_checkpoint_interval = CheckpointInter},
+ snapshot_state = SnapState}) ->
+ CheckpointLimit = case ra_snapshot:latest_checkpoint(SnapState) of
+ undefined -> CheckpointInter;
+ {I, _} -> I + CheckpointInter
+ end,
+ Idx > CheckpointLimit.
+
-spec append_sync(Entry :: log_entry(), State :: state()) ->
state() | no_return().
append_sync({Idx, Term, _} = Entry, Log0) ->
@@ -734,6 +796,11 @@ overview(#?MODULE{last_index = LastIndex,
undefined -> undefined;
{I, _} -> I
end,
+ latest_checkpoint_index =>
+ case ra_snapshot:latest_checkpoint(SnapshotState) of
+ undefined -> undefined;
+ {I, _} -> I
+ end,
cache_size => ra_log_cache:size(Cache)
}.
@@ -971,11 +1038,16 @@ write_entries([{FstIdx, _, _} | Rest] = Entries, State0) ->
Error
end.
-write_snapshot(Meta, MacRef,
+write_snapshot(Meta, MacRef, SnapKind,
#?MODULE{cfg = Cfg,
snapshot_state = SnapState0} = State) ->
- ok = incr_counter(Cfg, ?C_RA_LOG_SNAPSHOTS_WRITTEN, 1),
- {SnapState, Effects} = ra_snapshot:begin_snapshot(Meta, MacRef, SnapState0),
+ Counter = case SnapKind of
+ snapshot -> ?C_RA_LOG_SNAPSHOTS_WRITTEN;
+ checkpoint -> ?C_RA_LOG_CHECKPOINTS_WRITTEN
+ end,
+ ok = incr_counter(Cfg, Counter, 1),
+ {SnapState, Effects} = ra_snapshot:begin_snapshot(Meta, MacRef, SnapKind,
+ SnapState0),
{State#?MODULE{snapshot_state = SnapState}, Effects}.
recover_range(UId, Reader, SegWriter) ->
diff --git a/src/ra_log_snapshot.erl b/src/ra_log_snapshot.erl
index e9cd5f95..4e2b147c 100644
--- a/src/ra_log_snapshot.erl
+++ b/src/ra_log_snapshot.erl
@@ -11,7 +11,8 @@
-export([
prepare/2,
- write/3,
+ write/4,
+ sync/1,
begin_accept/2,
accept_chunk/2,
complete_accept/2,
@@ -42,9 +43,9 @@ prepare(_Index, State) -> State.
%% Snapshot Data (binary)
%% @end
--spec write(file:filename(), meta(), term()) ->
+-spec write(file:filename(), meta(), term(), Sync :: boolean()) ->
ok | {error, file_err()}.
-write(Dir, Meta, MacState) ->
+write(Dir, Meta, MacState, Sync) ->
%% no compression on meta data to make sure reading it is as fast
%% as possible
MetaBin = term_to_binary(Meta),
@@ -55,7 +56,13 @@ write(Dir, Meta, MacState) ->
ra_lib:write_file(File, [<>,
- Data]).
+ Data], Sync).
+
+-spec sync(file:filename()) ->
+ ok | {error, file_err()}.
+sync(Dir) ->
+ File = filename(Dir),
+ ra_lib:sync_file(File).
begin_accept(SnapDir, Meta) ->
File = filename(SnapDir),
diff --git a/src/ra_machine.erl b/src/ra_machine.erl
index ba54092a..4734019a 100644
--- a/src/ra_machine.erl
+++ b/src/ra_machine.erl
@@ -135,6 +135,8 @@
{log, [ra_index()], fun(([user_command()]) -> effects())} |
{log, [ra_index()], fun(([user_command()]) -> effects()), {local, node()}} |
{release_cursor, ra_index(), state()} |
+ {release_cursor, ra_index()} |
+ {checkpoint, ra_index(), state()} |
{aux, term()} |
garbage_collection.
@@ -144,8 +146,13 @@
%% forcing a GC run.
%%
%% Although both leaders and followers will process the same commands, effects
-%% are typically only applied on the leader. The only exception to this is
-%% the `release_cursor' and `garbage_collect' effects. The former is realised on all
+%% are typically only applied on the leader. The only exceptions to this are:
+%%
+%% - `release_cursor'
+%% - `checkpoint'
+%% - `garbage_collect'
+%%
+%% The former two are realised on all
%% nodes as it is a part of the Ra implementation log truncation mechanism.
%% The `garbage_collect' effects that is used to explicitly triggering a GC run
%% in the Ra servers' process.
diff --git a/src/ra_server.erl b/src/ra_server.erl
index 055a9730..e1815116 100644
--- a/src/ra_server.erl
+++ b/src/ra_server.erl
@@ -46,6 +46,8 @@
% TODO: hide behind a handle_leader
make_rpcs/1,
update_release_cursor/3,
+ promote_checkpoint/2,
+ checkpoint/3,
persist_last_applied/1,
update_peer/3,
register_external_log_reader/2,
@@ -1643,6 +1645,10 @@ evaluate_commit_index_follower(State, Effects) ->
filter_follower_effects(Effects) ->
lists:foldr(fun ({release_cursor, _, _} = C, Acc) ->
[C | Acc];
+ ({release_cursor, _} = C, Acc) ->
+ [C | Acc];
+ ({checkpoint, _, _} = C, Acc) ->
+ [C | Acc];
({record_leader_msg, _} = C, Acc) ->
[C | Acc];
({aux, _} = C, Acc) ->
@@ -1846,6 +1852,21 @@ update_release_cursor(Index, MacState,
MacState, Log0),
{State#{log => Log}, Effects}.
+-spec checkpoint(ra_index(), term(), ra_server_state()) ->
+ {ra_server_state(), effects()}.
+checkpoint(Index, MacState,
+ State = #{log := Log0, cluster := Cluster}) ->
+ MacVersion = index_machine_version(Index, State),
+ {Log, Effects} = ra_log:checkpoint(Index, Cluster,
+ MacVersion, MacState, Log0),
+ {State#{log => Log}, Effects}.
+
+-spec promote_checkpoint(ra_index(), ra_server_state()) ->
+ {ra_server_state(), effects()}.
+promote_checkpoint(Index, #{log := Log0} = State) ->
+ {Log, Effects} = ra_log:promote_checkpoint(Index, Log0),
+ {State#{log => Log}, Effects}.
+
% Persist last_applied - as there is an inherent race we cannot
% always guarantee that side effects won't be re-issued when a
% follower that has seen an entry but not the commit_index
diff --git a/src/ra_server_proc.erl b/src/ra_server_proc.erl
index 81f4108f..7acdbecf 100644
--- a/src/ra_server_proc.erl
+++ b/src/ra_server_proc.erl
@@ -1329,6 +1329,19 @@ handle_effect(RaftState, {release_cursor, Index, MacState}, EvtType,
ServerState0),
State1 = State0#state{server_state = ServerState},
handle_effects(RaftState, Effects, EvtType, State1, Actions0);
+handle_effect(RaftState, {release_cursor, Index}, EvtType,
+ #state{server_state = ServerState0} = State0, Actions0) ->
+ incr_counter(State0#state.conf, ?C_RA_SRV_RELEASE_CURSORS, 1),
+ {ServerState, Effects} = ra_server:promote_checkpoint(Index, ServerState0),
+ State1 = State0#state{server_state = ServerState},
+ handle_effects(RaftState, Effects, EvtType, State1, Actions0);
+handle_effect(RaftState, {checkpoint, Index, MacState}, EvtType,
+ #state{server_state = ServerState0} = State0, Actions0) ->
+ incr_counter(State0#state.conf, ?C_RA_SRV_CHECKPOINTS, 1),
+ {ServerState, Effects} = ra_server:checkpoint(Index, MacState,
+ ServerState0),
+ State1 = State0#state{server_state = ServerState},
+ handle_effects(RaftState, Effects, EvtType, State1, Actions0);
handle_effect(_, garbage_collection, _EvtType, State, Actions) ->
true = erlang:garbage_collect(),
incr_counter(State#state.conf, ?C_RA_SRV_GCS, 1),
diff --git a/src/ra_snapshot.erl b/src/ra_snapshot.erl
index a6a01457..98d4ae08 100644
--- a/src/ra_snapshot.erl
+++ b/src/ra_snapshot.erl
@@ -8,7 +8,7 @@
-include("ra.hrl").
--type file_err() :: file:posix() | badarg | terminated | system_limit.
+-type file_err() :: ra_lib:file_err().
%% alias
-type meta() :: snapshot_meta().
@@ -20,17 +20,17 @@
read_chunk/3,
delete/2,
- init/3,
- init/4,
+ init/6,
init_ets/0,
current/1,
pending/1,
accepting/1,
- directory/1,
+ directory/2,
last_index_for/1,
- begin_snapshot/3,
- complete_snapshot/2,
+ begin_snapshot/4,
+ promote_checkpoint/2,
+ complete_snapshot/3,
begin_accept/2,
accept_chunk/4,
@@ -39,12 +39,28 @@
context/2,
handle_down/3,
- current_snapshot_dir/1
+ current_snapshot_dir/1,
+
+ latest_checkpoint/1,
+
+ take_older_checkpoints/2,
+ take_extra_checkpoints/1
]).
-type effect() :: {monitor, process, snapshot_writer, pid()}.
--export_type([meta/0, file_err/0, effect/0, chunk_flag/0]).
+-type kind() :: snapshot | checkpoint.
+
+-type checkpoint() :: ra_idxterm().
+
+-export_type([
+ meta/0,
+ file_err/0,
+ effect/0,
+ chunk_flag/0,
+ kind/0,
+ checkpoint/0
+ ]).
-record(accept, {%% the next expected chunk
next = 1 :: non_neg_integer(),
@@ -55,14 +71,19 @@
{uid :: ra_uid(),
counter :: undefined | counters:counters_ref(),
module :: module(),
- %% the snapshot directory
%% typically /snapshots
%% snapshot subdirs are store below
%% this as /snapshots/Term_Index
- directory :: file:filename(),
- pending :: option({pid(), ra_idxterm()}),
+ snapshot_directory :: file:filename(),
+ %% /checkpoints
+ %% like snapshots, these are also stored in subdirs
+ %% as /checkpoints/Term_Index
+ checkpoint_directory :: file:filename(),
+ pending :: option({pid(), ra_idxterm(), kind()}),
accepting :: option(#accept{}),
- current :: option(ra_idxterm())}).
+ current :: option(ra_idxterm()),
+ checkpoints = [] :: list(checkpoint()),
+ max_checkpoints :: pos_integer()}).
-define(ETSTBL, ra_log_snapshot_state).
@@ -81,13 +102,20 @@
%% Saves snapshot from external state to disk.
%% Runs in a separate process.
%% External storage should be available to read
+%% `Sync' suggests whether the file should be synchronized with `fsync(1)'.
-callback write(Location :: file:filename(),
Meta :: meta(),
- Ref :: term()) ->
+ Ref :: term(),
+ Sync :: boolean()) ->
ok |
{ok, Bytes :: non_neg_integer()} |
{error, file_err() | term()}.
+%% Synchronizes the snapshot to disk.
+-callback sync(Location :: file:filename()) ->
+ ok |
+ {error, file_err() | term()}.
+
%% Read the snapshot metadata and initialise a read state used in read_chunk/1
%% The read state should contain all the information required to read a chunk
@@ -139,19 +167,22 @@
-callback context() -> map().
--spec init(ra_uid(), module(), file:filename()) ->
+-spec init(ra_uid(), module(), file:filename(), file:filename(),
+ undefined | counters:counters_ref(), pos_integer()) ->
state().
-init(UId, Mod, File) ->
- init(UId, Mod, File, undefined).
-
--spec init(ra_uid(), module(), file:filename(),
- undefined | counters:counters_ref()) ->
- state().
-init(UId, Module, SnapshotsDir, Counter) ->
+init(UId, Module, SnapshotsDir, CheckpointDir, Counter, MaxCheckpoints) ->
State = #?MODULE{uid = UId,
counter = Counter,
module = Module,
- directory = SnapshotsDir},
+ snapshot_directory = SnapshotsDir,
+ checkpoint_directory = CheckpointDir,
+ max_checkpoints = MaxCheckpoints},
+ State1 = find_snapshots(State),
+ find_checkpoints(State1).
+
+find_snapshots(#?MODULE{uid = UId,
+ module = Module,
+ snapshot_directory = SnapshotsDir} = State) ->
true = ra_lib:is_dir(SnapshotsDir),
{ok, Snaps0} = prim_file:list_dir(SnapshotsDir),
Snaps = lists:reverse(lists:sort(Snaps0)),
@@ -186,6 +217,47 @@ pick_first_valid(UId, Mod, Dir, [S | Rem]) ->
pick_first_valid(UId, Mod, Dir, Rem)
end.
+find_checkpoints(#?MODULE{uid = UId,
+ module = Module,
+ current = Current,
+ checkpoint_directory = CheckpointDir} = State) ->
+ true = ra_lib:is_dir(CheckpointDir),
+ CurrentIdx = case Current of
+ undefined ->
+ -1;
+ {I, _} ->
+ I
+ end,
+ {ok, CPFiles0} = prim_file:list_dir(CheckpointDir),
+ CPFiles = lists:reverse(lists:sort(CPFiles0)),
+ Checkpoints =
+ lists:filtermap(
+ fun(File) ->
+ CP = filename:join(CheckpointDir, File),
+ case Module:validate(CP) of
+ ok ->
+ {ok, #{index := Idx, term := Term}} =
+ Module:read_meta(CP),
+ case Idx > CurrentIdx of
+ true ->
+ {true, {Idx, Term}};
+ false ->
+ ?INFO("ra_snapshot: ~ts: removing "
+ "checkpoint ~s as was older than the "
+ "current snapshot.",
+ [UId, CP]),
+ delete(CheckpointDir, {Idx, Term}),
+ false
+ end;
+ Err ->
+ ?INFO("ra_snapshot: ~ts: removing checkpoint ~s as "
+ "did not validate. Err: ~w",
+ [UId, CP, Err]),
+ ra_lib:recursive_delete(CP),
+ false
+ end
+ end, CPFiles),
+ State#?MODULE{checkpoints = Checkpoints}.
-spec init_ets() -> ok.
init_ets() ->
@@ -200,7 +272,11 @@ init_ets() ->
-spec current(state()) -> option(ra_idxterm()).
current(#?MODULE{current = Current}) -> Current.
--spec pending(state()) -> option({pid(), ra_idxterm()}).
+-spec latest_checkpoint(state()) -> option(checkpoint()).
+latest_checkpoint(#?MODULE{checkpoints = [Current | _]}) -> Current;
+latest_checkpoint(#?MODULE{checkpoints = _}) -> undefined.
+
+-spec pending(state()) -> option({pid(), ra_idxterm(), kind()}).
pending(#?MODULE{pending = Pending}) ->
Pending.
@@ -210,8 +286,9 @@ accepting(#?MODULE{accepting = undefined}) ->
accepting(#?MODULE{accepting = #accept{idxterm = Accepting}}) ->
Accepting.
--spec directory(state()) -> file:filename().
-directory(#?MODULE{directory = Dir}) -> Dir.
+-spec directory(state(), kind()) -> file:filename().
+directory(#?MODULE{snapshot_directory = Dir}, snapshot) -> Dir;
+directory(#?MODULE{checkpoint_directory = Dir}, checkpoint) -> Dir.
-spec last_index_for(ra_uid()) -> option(ra_index()).
last_index_for(UId) ->
@@ -220,12 +297,23 @@ last_index_for(UId) ->
[{_, Index}] -> Index
end.
--spec begin_snapshot(meta(), ReleaseCursorRef :: term(), state()) ->
+-spec begin_snapshot(meta(), ReleaseCursorRef :: term(), kind(), state()) ->
{state(), [effect()]}.
-begin_snapshot(#{index := Idx, term := Term} = Meta, MacRef,
+begin_snapshot(#{index := Idx, term := Term} = Meta, MacRef, SnapKind,
#?MODULE{module = Mod,
counter = Counter,
- directory = Dir} = State) ->
+ snapshot_directory = SnapshotDir,
+ checkpoint_directory = CheckpointDir} = State) ->
+ {CounterIdx, Dir} =
+ case SnapKind of
+ snapshot ->
+ {?C_RA_LOG_SNAPSHOT_BYTES_WRITTEN, SnapshotDir};
+ checkpoint ->
+ {?C_RA_LOG_CHECKPOINT_BYTES_WRITTEN, CheckpointDir}
+ end,
+ %% Snapshots must be fsync'd but checkpoints are OK to not sync.
+ %% Checkpoints are fsync'd before promotion instead.
+ Sync = SnapKind =:= snapshot,
%% create directory for this snapshot
SnapDir = make_snapshot_dir(Dir, Idx, Term),
%% call prepare then write_snapshot
@@ -236,39 +324,92 @@ begin_snapshot(#{index := Idx, term := Term} = Meta, MacRef,
Self = self(),
Pid = spawn(fun () ->
ok = ra_lib:make_dir(SnapDir),
- case Mod:write(SnapDir, Meta, Ref) of
+ case Mod:write(SnapDir, Meta, Ref, Sync) of
ok -> ok;
{ok, BytesWritten} ->
- counters_add(Counter,
- ?C_RA_LOG_SNAPSHOT_BYTES_WRITTEN,
+ counters_add(Counter, CounterIdx,
BytesWritten),
ok
end,
Self ! {ra_log_event,
- {snapshot_written, {Idx, Term}}},
+ {snapshot_written, {Idx, Term}, SnapKind}},
ok
end),
%% record snapshot in progress
%% emit an effect that monitors the current snapshot attempt
- {State#?MODULE{pending = {Pid, {Idx, Term}}},
+ {State#?MODULE{pending = {Pid, {Idx, Term}, SnapKind}},
[{monitor, process, snapshot_writer, Pid}]}.
--spec complete_snapshot(ra_idxterm(), state()) ->
+-spec promote_checkpoint(Idx :: ra_index(), State0 :: state()) ->
+ {State :: state(), Effects :: [effect()]}.
+promote_checkpoint(PromotionIdx,
+ #?MODULE{module = Mod,
+ snapshot_directory = SnapDir,
+ checkpoint_directory = CheckpointDir,
+ checkpoints = Checkpoints0} = State0) ->
+ %% Find the checkpoint with the highest index smaller than or equal to the
+ %% given `Idx' and rename the checkpoint directory to the snapshot
+ %% directory.
+ case find_promotable_checkpoint(PromotionIdx, Checkpoints0, []) of
+ {Checkpoints, {Idx, Term}} ->
+ Checkpoint = make_snapshot_dir(CheckpointDir, Idx, Term),
+ Snapshot = make_snapshot_dir(SnapDir, Idx, Term),
+ Self = self(),
+ Pid = spawn(fun() ->
+ %% Checkpoints are created without calling
+ %% fsync. Snapshots must be fsync'd though, so
+ %% sync the checkpoint before promoting it
+ %% into a snapshot.
+ ok = Mod:sync(Checkpoint),
+ ok = file:rename(Checkpoint, Snapshot),
+ Self ! {ra_log_event,
+ {snapshot_written,
+ {Idx, Term}, snapshot}}
+ end),
+ State = State0#?MODULE{pending = {Pid, {Idx, Term}, snapshot},
+ checkpoints = Checkpoints},
+ {State, [{monitor, process, snapshot_writer, Pid}]};
+ undefined ->
+ {State0, []}
+ end.
+
+%% Find the first checkpoint smaller than or equal to the promotion index and
+%% remove it from the checkpoint list.
+-spec find_promotable_checkpoint(PromotionIdx, Checkpoints, Acc) -> Result
+ when
+ PromotionIdx :: ra_index(),
+ Checkpoints :: [ra_idxterm()],
+ Acc :: [ra_idxterm()],
+ Result :: option({[ra_idxterm()], ra_idxterm()}).
+find_promotable_checkpoint(Idx, [{CPIdx, _} = CP | Rest], Acc)
+ when CPIdx =< Idx ->
+ %% Checkpoints are sorted by index descending so the first checkpoint
+ %% with an index smaller than or equal to the promotion index is the proper
+ %% checkpoint to promote.
+ {lists:reverse(Rest, Acc), CP};
+find_promotable_checkpoint(Idx, [CP | Rest], Acc) ->
+ find_promotable_checkpoint(Idx, Rest, [CP | Acc]);
+find_promotable_checkpoint(_Idx, [], _Acc) ->
+ undefined.
+
+-spec complete_snapshot(ra_idxterm(), kind(), state()) ->
state().
-complete_snapshot({Idx, _} = IdxTerm,
- #?MODULE{uid = UId,
- module = _Mod,
- directory = _Dir} = State) ->
+complete_snapshot({Idx, _} = IdxTerm, snapshot,
+ #?MODULE{uid = UId} = State) ->
true = ets:insert(?ETSTBL, {UId, Idx}),
State#?MODULE{pending = undefined,
- current = IdxTerm}.
+ current = IdxTerm};
+complete_snapshot(IdxTerm, checkpoint,
+ #?MODULE{checkpoints = Checkpoints0} = State) ->
+ State#?MODULE{pending = undefined,
+ checkpoints = [IdxTerm | Checkpoints0]}.
-spec begin_accept(meta(), state()) ->
{ok, state()}.
begin_accept(#{index := Idx, term := Term} = Meta,
#?MODULE{module = Mod,
- directory = Dir} = State) ->
+ snapshot_directory = Dir} = State) ->
SnapDir = make_snapshot_dir(Dir, Idx, Term),
ok = ra_lib:make_dir(SnapDir),
{ok, AcceptState} = Mod:begin_accept(SnapDir, Meta),
@@ -280,7 +421,7 @@ begin_accept(#{index := Idx, term := Term} = Meta,
accept_chunk(Chunk, Num, last,
#?MODULE{uid = UId,
module = Mod,
- directory = Dir,
+ snapshot_directory = Dir,
current = Current,
accepting = #accept{next = Num,
idxterm = {Idx, _} = IdxTerm,
@@ -314,7 +455,7 @@ accept_chunk(_Chunk, Num, _ChunkFlag,
abort_accept(#?MODULE{accepting = undefined} = State) ->
State;
abort_accept(#?MODULE{accepting = #accept{idxterm = {Idx, Term}},
- directory = Dir} = State) ->
+ snapshot_directory = Dir} = State) ->
ok = delete(Dir, {Idx, Term}),
State#?MODULE{accepting = undefined}.
@@ -342,9 +483,14 @@ handle_down(_Pid, noproc, State) ->
%% finished
State;
handle_down(Pid, _Info,
- #?MODULE{directory = Dir,
- pending = {Pid, IdxTerm}} = State) ->
- %% delete the pending snapshot directory
+ #?MODULE{snapshot_directory = SnapshotDir,
+ checkpoint_directory = CheckpointDir,
+ pending = {Pid, IdxTerm, SnapKind}} = State) ->
+ %% delete the pending snapshot/checkpoint directory
+ Dir = case SnapKind of
+ snapshot -> SnapshotDir;
+ checkpoint -> CheckpointDir
+ end,
ok = delete(Dir, IdxTerm),
State#?MODULE{pending = undefined}.
@@ -359,7 +505,7 @@ delete(Dir, {Idx, Term}) ->
{ok, Meta :: meta(), ReadState} |
{error, term()} when ReadState :: term().
begin_read(#?MODULE{module = Mod,
- directory = Dir,
+ snapshot_directory = Dir,
current = {Idx, Term}},
Context) when is_map(Context) ->
Location = make_snapshot_dir(Dir, Idx, Term),
@@ -371,23 +517,35 @@ begin_read(#?MODULE{module = Mod,
{ok, Data :: term(), {next, ReadState} | last} |
{error, term()} when ReadState :: term().
read_chunk(ReadState, ChunkSizeBytes, #?MODULE{module = Mod,
- directory = Dir,
+ snapshot_directory = Dir,
current = {Idx, Term}}) ->
%% TODO: do we need to generate location for every chunk?
Location = make_snapshot_dir(Dir, Idx, Term),
Mod:read_chunk(ReadState, ChunkSizeBytes, Location).
+%% Recovers from the latest checkpoint or snapshot, if available.
-spec recover(state()) ->
{ok, Meta :: meta(), State :: term()} |
{error, no_current_snapshot} |
{error, term()}.
-recover(#?MODULE{current = undefined}) ->
+recover(#?MODULE{current = undefined, checkpoints = []}) ->
{error, no_current_snapshot};
recover(#?MODULE{module = Mod,
- directory = Dir,
- current = {Idx, Term}}) ->
- SnapDir = make_snapshot_dir(Dir, Idx, Term),
- Mod:recover(SnapDir).
+ current = Snapshot,
+ snapshot_directory = SnapDir,
+ checkpoints = Checkpoints,
+ checkpoint_directory = CheckpointDir}) ->
+ %% If there are checkpoints and a snapshot, recover from whichever has the
+ %% highest index. Otherwise recover from whichever exists.
+ Dir = case {Snapshot, Checkpoints} of
+ {{SnapIdx, _}, [{CPIdx, CPTerm} | _]} when CPIdx > SnapIdx ->
+ make_snapshot_dir(CheckpointDir, CPIdx, CPTerm);
+ {{Idx, Term}, _} ->
+ make_snapshot_dir(SnapDir, Idx, Term);
+ {undefined, [{Idx, Term} | _]} ->
+ make_snapshot_dir(CheckpointDir, Idx, Term)
+ end,
+ Mod:recover(Dir).
-spec read_meta(Module :: module(), Location :: file:filename()) ->
{ok, meta()} |
@@ -401,12 +559,36 @@ read_meta(Module, Location) ->
-spec current_snapshot_dir(state()) ->
option(file:filename()).
-current_snapshot_dir(#?MODULE{directory = Dir,
+current_snapshot_dir(#?MODULE{snapshot_directory = Dir,
current = {Idx, Term}}) ->
make_snapshot_dir(Dir, Idx, Term);
current_snapshot_dir(_) ->
undefined.
+-spec take_older_checkpoints(ra_index(), state()) ->
+ {state(), [checkpoint()]}.
+take_older_checkpoints(Idx, #?MODULE{checkpoints = Checkpoints0} = State0) ->
+ {Checkpoints, Outdated} = lists:splitwith(fun ({CPIdx, _Term}) ->
+ CPIdx > Idx
+ end, Checkpoints0),
+ {State0#?MODULE{checkpoints = Checkpoints}, Outdated}.
+
+-spec take_extra_checkpoints(state()) ->
+ {state(), [checkpoint()]}.
+take_extra_checkpoints(#?MODULE{checkpoints = Checkpoints0,
+ max_checkpoints = MaxCheckpoints} = State0) ->
+ Len = erlang:length(Checkpoints0),
+ case Len - MaxCheckpoints of
+ ToDelete when ToDelete > 0 ->
+ %% Take `ToDelete' checkpoints from the list randomly without
+ %% ever taking the first or last checkpoint.
+ IdxsToTake = random_idxs_to_take(MaxCheckpoints, ToDelete),
+ {Checkpoints, Extras} = lists_take_idxs(Checkpoints0, IdxsToTake),
+ {State0#?MODULE{checkpoints = Checkpoints}, Extras};
+ _ ->
+ {State0, []}
+ end.
+
%% Utility
make_snapshot_dir(Dir, Index, Term) ->
@@ -418,3 +600,66 @@ counters_add(undefined, _, _) ->
ok;
counters_add(Counter, Ix, Incr) ->
counters:add(Counter, Ix, Incr).
+
+random_idxs_to_take(Max, N) ->
+ %% Always retain the first and last elements.
+ AllIdxs = lists:seq(2, Max - 1),
+ %% Take a random subset of those indices of length N.
+ lists:sublist(ra_lib:lists_shuffle(AllIdxs), N).
+
+%% Take items from the given list by the given indices without disturbing the
+%% order of the list.
+-spec lists_take_idxs(List, Idxs) -> {List1, Taken} when
+ List :: list(Elem),
+ Elem :: any(),
+ Idxs :: list(pos_integer()),
+ List1 :: list(Elem),
+ Taken :: list(Elem).
+lists_take_idxs(List, Idxs0) ->
+ %% Sort the indices so `lists_take_idxs/5' may run linearly on the two lists
+ Idxs = lists:sort(Idxs0),
+ %% 1-indexing like the `lists' module.
+ lists_take_idxs(List, Idxs, 1, [], []).
+
+lists_take_idxs([Elem | Elems], [Idx | Idxs], Idx, TakeAcc, ElemAcc) ->
+ lists_take_idxs(Elems, Idxs, Idx + 1, [Elem | TakeAcc], ElemAcc);
+lists_take_idxs([Elem | Elems], Idxs, Idx, TakeAcc, ElemAcc) ->
+ lists_take_idxs(Elems, Idxs, Idx + 1, TakeAcc, [Elem | ElemAcc]);
+lists_take_idxs(Elems, _Idxs = [], _Idx, TakeAcc, ElemAcc) ->
+ {lists:reverse(ElemAcc, Elems), lists:reverse(TakeAcc)};
+lists_take_idxs(_Elems = [], _Idxs, _Idx, TakeAcc, ElemAcc) ->
+ {lists:reverse(ElemAcc), lists:reverse(TakeAcc)}.
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
+random_idxs_to_take_test() ->
+ Idxs = random_idxs_to_take(10, 3),
+ ?assertEqual(3, length(Idxs)),
+ [Min, _, Max] = lists:sort(Idxs),
+ %% The first and last elements are excluded.
+ ?assert(Min > 1),
+ ?assert(Max < 10),
+ ok.
+
+lists_take_idxs_test() ->
+ ?assertEqual(
+ {[1, 3, 5, 7, 8], [2, 4, 6]},
+ lists_take_idxs(lists:seq(1, 8), [2, 4, 6])),
+
+ %% Ordering of `Idxs' doesn't matter.
+ ?assertEqual(
+ {[1, 3, 5, 7, 8], [2, 4, 6]},
+ lists_take_idxs(lists:seq(1, 8), [4, 6, 2])),
+
+ ?assertEqual(
+ {[a, c], [b]},
+ lists_take_idxs([a, b, c], [2])),
+
+ %% `List''s order is preserved even when nothing is taken.
+ ?assertEqual(
+ {[a, b, c], []},
+ lists_take_idxs([a, b, c], [])),
+ ok.
+
+-endif.
diff --git a/test/coordination_SUITE.erl b/test/coordination_SUITE.erl
index 269aef8b..4a98a6ac 100644
--- a/test/coordination_SUITE.erl
+++ b/test/coordination_SUITE.erl
@@ -47,7 +47,8 @@ all_tests() ->
leaderboard,
bench,
disconnected_node_catches_up,
- key_metrics
+ key_metrics,
+ recover_from_checkpoint
].
groups() ->
@@ -708,6 +709,86 @@ bench(Config) ->
ra_lib:recursive_delete(PrivDir),
ok.
+recover_from_checkpoint(Config) ->
+ PrivDir = ?config(data_dir, Config),
+ ClusterName = ?config(cluster_name, Config),
+ ServerNames = [s1, s2, s3],
+ ServerIds = [{ClusterName, start_follower(N, PrivDir)} || N <- ServerNames],
+ Configs = [begin
+ UId = atom_to_binary(Name, utf8),
+ #{cluster_name => ClusterName,
+ id => NodeId,
+ uid => UId,
+ initial_members => ServerIds,
+ machine => {module, ?MODULE, #{}},
+ log_init_args => #{uid => UId,
+ min_checkpoint_interval => 3,
+ snapshot_interval => 5}}
+ end || {Name, _Node} = NodeId <- ServerIds],
+ {ok, Started, []} = ra:start_cluster(?SYS, Configs),
+ {ok, _, Leader} = ra:members(hd(Started)),
+ [Follower1, Follower2] = ServerIds -- [Leader],
+
+ %% Send five commands to trigger a snapshot.
+ [ok = ra:pipeline_command(Leader, N, no_correlation, normal)
+ || N <- lists:seq(1, 6)],
+ await_condition(
+ fun () ->
+ {ok, #{log := #{snapshot_index := LeaderIdx}}, _} =
+ ra:member_overview(Leader),
+ {ok, #{log := #{snapshot_index := Follower1Idx}}, _} =
+ ra:member_overview(Follower1),
+ {ok, #{log := #{snapshot_index := Follower2Idx}}, _} =
+ ra:member_overview(Follower2),
+ LeaderIdx =:= 6 andalso Follower1Idx =:= 6 andalso
+ Follower2Idx =:= 6
+ end, 20),
+
+ %% Trigger a checkpoint.
+ {ok, _, _} = ra:process_command(Leader, checkpoint),
+ await_condition(
+ fun () ->
+ {ok, #{log := #{latest_checkpoint_index := LeaderIdx}}, _} =
+ ra:member_overview(Leader),
+ {ok, #{log := #{latest_checkpoint_index := Follower1Idx}}, _} =
+ ra:member_overview(Follower1),
+ {ok, #{log := #{latest_checkpoint_index := Follower2Idx}}, _} =
+ ra:member_overview(Follower2),
+ LeaderIdx =:= 8 andalso Follower1Idx =:= 8 andalso
+ Follower2Idx =:= 8
+ end, 20),
+
+ %% Restart the servers
+ [ok = ra:stop_server(?SYS, ServerId) || ServerId <- ServerIds],
+ [ok = ra:restart_server(?SYS, ServerId) || ServerId <- ServerIds],
+
+ %% All servers should have recovered from their checkpoints since the
+ %% checkpoint has a higher index than the snapshot.
+ [{ok, {_CurrentIdx, _CheckpointIdx = 8}, _Leader} =
+ ra:local_query(ServerId, fun(State) ->
+ maps:get(checkpoint_index, State,
+ undefined)
+ end) || ServerId <- ServerIds],
+
+ %% Promote the checkpoint into a snapshot.
+ {ok, _, _} = ra:process_command(Leader, promote_checkpoint),
+ await_condition(
+ fun () ->
+ {ok, #{log := #{snapshot_index := LeaderIdx}}, _} =
+ ra:member_overview(Leader),
+ {ok, #{log := #{snapshot_index := Follower1Idx}}, _} =
+ ra:member_overview(Follower1),
+ {ok, #{log := #{snapshot_index := Follower2Idx}}, _} =
+ ra:member_overview(Follower2),
+ LeaderIdx =:= 8 andalso Follower1Idx =:= 8 andalso
+ Follower2Idx =:= 8
+ end, 20),
+
+ [ok = slave:stop(S) || {_, S} <- ServerIds],
+ ok.
+
+%% Utility
+
test_local_msg(Leader, ReceiverNode, ExpectedSenderNode, CmdTag, Opts0) ->
Opts = case Opts0 of
local -> [local];
@@ -761,8 +842,6 @@ test_local_msg(Leader, ReceiverNode, ExpectedSenderNode, CmdTag, Opts0) ->
flush(),
ok.
-%% Utility
-
get_current_host() ->
NodeStr = atom_to_list(node()),
Host = re:replace(NodeStr, "^[^@]+@", "", [{return, list}]),
@@ -802,7 +881,7 @@ flush() ->
%% ra_machine impl
init(_) ->
- {#{}, []}.
+ #{}.
apply(_Meta, {send_local_msg, Pid, Opts}, State) ->
{State, ok, [{send_msg, Pid, {local_msg, node()}, Opts}]};
@@ -815,6 +894,15 @@ apply(#{index := Idx}, {do_local_log, SenderPid, Opts}, State) ->
{State, ok, [Eff]};
apply(#{index := _Idx}, {data, _}, State) ->
{State, ok, []};
+apply(#{index := Idx}, checkpoint, State) ->
+ %% Generally machines should save their state without any modifications
+ %% but we slightly modify the machine state we save in the checkpoint here
+ %% so that we can tell when we've recovered from a checkpoint rather than
+ %% a snapshot.
+ CheckpointState = maps:put(checkpoint_index, Idx, State),
+ {State, ok, [{checkpoint, Idx, CheckpointState}]};
+apply(#{index := Idx}, promote_checkpoint, State) ->
+ {State, ok, [{release_cursor, Idx}]};
apply(#{index := Idx}, _Cmd, State) ->
{State, ok, [{release_cursor, Idx, State}]}.
diff --git a/test/ra_checkpoint_SUITE.erl b/test/ra_checkpoint_SUITE.erl
new file mode 100644
index 00000000..bfcdfe28
--- /dev/null
+++ b/test/ra_checkpoint_SUITE.erl
@@ -0,0 +1,357 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2024 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries.
+%%
+-module(ra_checkpoint_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include("src/ra.hrl").
+
+%%%===================================================================
+%%% Common Test callbacks
+%%%===================================================================
+
+all() ->
+ [
+ {group, tests}
+ ].
+
+
+all_tests() ->
+ [
+ init_empty,
+ take_checkpoint,
+ take_checkpoint_crash,
+ recover_from_checkpoint_only,
+ recover_from_checkpoint_and_snapshot,
+ newer_snapshot_deletes_older_checkpoints,
+ init_recover_corrupt,
+ init_recover_multi_corrupt
+ ].
+
+groups() ->
+ [
+ {tests, [], all_tests()}
+ ].
+
+init_per_suite(Config) ->
+ Config.
+
+end_per_suite(_Config) ->
+ ok.
+
+init_per_group(_Group, Config) ->
+ Config.
+
+end_per_group(_Group, _Config) ->
+ ok.
+
+init_per_testcase(TestCase, Config) ->
+ ok = ra_snapshot:init_ets(),
+ SnapDir = filename:join([?config(priv_dir, Config),
+ TestCase, "snapshots"]),
+ CheckpointDir = filename:join([?config(priv_dir, Config),
+ TestCase, "checkpoints"]),
+ ok = ra_lib:make_dir(SnapDir),
+ ok = ra_lib:make_dir(CheckpointDir),
+ [{uid, ra_lib:to_binary(TestCase)},
+ {snap_dir, SnapDir},
+ {checkpoint_dir, CheckpointDir},
+ {max_checkpoints, ?DEFAULT_MAX_CHECKPOINTS} | Config].
+
+end_per_testcase(_TestCase, _Config) ->
+ ok.
+
+%%%===================================================================
+%%% Test cases
+%%%===================================================================
+
+init_empty(Config) ->
+ State = init_state(Config),
+ undefined = ra_snapshot:latest_checkpoint(State),
+
+ ok.
+
+take_checkpoint(Config) ->
+ State0 = init_state(Config),
+
+ Meta = meta(55, 2, [node()]),
+ MacRef = ?FUNCTION_NAME,
+ {State1, [{monitor, process, snapshot_writer, Pid}]} =
+ ra_snapshot:begin_snapshot(Meta, MacRef, checkpoint, State0),
+ undefined = ra_snapshot:latest_checkpoint(State1),
+ {Pid, {55, 2}, checkpoint} = ra_snapshot:pending(State1),
+ receive
+ {ra_log_event, {snapshot_written, {55, 2} = IdxTerm, checkpoint}} ->
+ State = ra_snapshot:complete_snapshot(IdxTerm, checkpoint, State1),
+ undefined = ra_snapshot:pending(State),
+ {55, 2} = ra_snapshot:latest_checkpoint(State),
+ ok
+ after 1000 ->
+ error(snapshot_event_timeout)
+ end,
+
+ ok.
+
+take_checkpoint_crash(Config) ->
+ State0 = init_state(Config),
+ Meta = meta(55, 2, [node()]),
+ MacRef = ?FUNCTION_NAME,
+ {State1, [{monitor, process, snapshot_writer, Pid}]} =
+ ra_snapshot:begin_snapshot(Meta, MacRef, checkpoint, State0),
+ undefined = ra_snapshot:latest_checkpoint(State1),
+ {Pid, {55, 2}, checkpoint} = ra_snapshot:pending(State1),
+ receive
+ {ra_log_event, _} ->
+ %% Just pretend the snapshot event didn't happen
+ %% and the process instead crashed.
+ ok
+ after 10 -> ok
+ end,
+
+ State = ra_snapshot:handle_down(Pid, it_crashed_dawg, State1),
+ %% If the checkpoint process crashed we just have to consider the
+ %% checkpoint as faulty and clear it up.
+ undefined = ra_snapshot:pending(State),
+ undefined = ra_snapshot:latest_checkpoint(State),
+
+ %% The written checkpoint should be removed.
+ ?assertEqual([], list_checkpoint_dirs(Config)),
+
+ ok.
+
+recover_from_checkpoint_only(Config) ->
+ State0 = init_state(Config),
+ {error, no_current_snapshot} = ra_snapshot:recover(State0),
+
+ Meta = meta(55, 2, [node()]),
+ {State1, [{monitor, process, snapshot_writer, _}]} =
+ ra_snapshot:begin_snapshot(Meta, ?FUNCTION_NAME, checkpoint, State0),
+ receive
+ {ra_log_event, {snapshot_written, IdxTerm, checkpoint}} ->
+ _ = ra_snapshot:complete_snapshot(IdxTerm, checkpoint, State1),
+ ok
+ after 1000 ->
+ error(snapshot_event_timeout)
+ end,
+
+ %% Open a new snapshot state to simulate a restart.
+ Recover = init_state(Config),
+ undefined = ra_snapshot:pending(Recover),
+ {55, 2} = ra_snapshot:latest_checkpoint(Recover),
+ undefined = ra_snapshot:current(Recover),
+
+ {ok, Meta, ?FUNCTION_NAME} = ra_snapshot:recover(Recover),
+
+ ok.
+
+recover_from_checkpoint_and_snapshot(Config) ->
+ State0 = init_state(Config),
+ {error, no_current_snapshot} = ra_snapshot:recover(State0),
+
+ %% Snapshot.
+ SnapMeta = meta(55, 2, [node()]),
+ {State1, [{monitor, process, snapshot_writer, _}]} =
+ ra_snapshot:begin_snapshot(SnapMeta, ?FUNCTION_NAME, snapshot, State0),
+ State2 = receive
+ {ra_log_event, {snapshot_written, IdxTerm1, snapshot}} ->
+ ra_snapshot:complete_snapshot(IdxTerm1, snapshot, State1)
+ after 1000 ->
+ error(snapshot_event_timeout)
+ end,
+
+ %% Checkpoint at a later index.
+ CPMeta = meta(105, 3, [node()]),
+ {State3, [{monitor, process, snapshot_writer, _}]} =
+ ra_snapshot:begin_snapshot(CPMeta, ?FUNCTION_NAME, checkpoint, State2),
+ receive
+ {ra_log_event, {snapshot_written, IdxTerm2, checkpoint}} ->
+ _ = ra_snapshot:complete_snapshot(IdxTerm2, checkpoint, State3),
+ ok
+ after 1000 ->
+ error(snapshot_event_timeout)
+ end,
+
+ %% Open a new snapshot state to simulate a restart.
+ Recover = init_state(Config),
+ undefined = ra_snapshot:pending(Recover),
+ %% Both the checkpoint and the snapshot exist.
+ {105, 3} = ra_snapshot:latest_checkpoint(Recover),
+ {55, 2} = ra_snapshot:current(Recover),
+ %% The checkpoint is used for recovery since it is newer.
+ {ok, CPMeta, ?FUNCTION_NAME} = ra_snapshot:recover(Recover),
+
+ ok.
+
+newer_snapshot_deletes_older_checkpoints(Config) ->
+ State0 = init_state(Config),
+ {error, no_current_snapshot} = ra_snapshot:recover(State0),
+
+ %% Checkpoint at 25.
+ CP1Meta = meta(25, 2, [node()]),
+ {State1, [{monitor, process, snapshot_writer, _}]} =
+ ra_snapshot:begin_snapshot(CP1Meta, ?FUNCTION_NAME, checkpoint, State0),
+ State2 = receive
+ {ra_log_event, {snapshot_written, IdxTerm1, checkpoint}} ->
+ ra_snapshot:complete_snapshot(IdxTerm1, checkpoint, State1)
+ after 1000 ->
+ error(snapshot_event_timeout)
+ end,
+
+ %% Checkpoint at 35.
+ CP2Meta = meta(35, 3, [node()]),
+ {State3, [{monitor, process, snapshot_writer, _}]} =
+ ra_snapshot:begin_snapshot(CP2Meta, ?FUNCTION_NAME, checkpoint, State2),
+ State4 = receive
+ {ra_log_event, {snapshot_written, IdxTerm2, checkpoint}} ->
+ ra_snapshot:complete_snapshot(IdxTerm2, checkpoint, State3)
+ after 1000 ->
+ error(snapshot_event_timeout)
+ end,
+
+ %% Checkpoint at 55.
+ CP3Meta = meta(55, 5, [node()]),
+ {State5, [{monitor, process, snapshot_writer, _}]} =
+ ra_snapshot:begin_snapshot(CP3Meta, ?FUNCTION_NAME, checkpoint, State4),
+ State6 = receive
+ {ra_log_event, {snapshot_written, IdxTerm3, checkpoint}} ->
+ ra_snapshot:complete_snapshot(IdxTerm3, checkpoint, State5)
+ after 1000 ->
+ error(snapshot_event_timeout)
+ end,
+
+ %% Snapshot at 45.
+ SnapMeta = meta(45, 4, [node()]),
+ {State7, [{monitor, process, snapshot_writer, _}]} =
+ ra_snapshot:begin_snapshot(SnapMeta, ?FUNCTION_NAME, snapshot, State6),
+ State8 = receive
+ {ra_log_event, {snapshot_written, IdxTerm4, snapshot}} ->
+ ra_snapshot:complete_snapshot(IdxTerm4, snapshot, State7)
+ after 1000 ->
+ error(snapshot_event_timeout)
+ end,
+
+ %% The first and second checkpoint are older than the snapshot.
+ {_State, [{35, 3}, {25, 2}]} =
+ ra_snapshot:take_older_checkpoints(45, State8),
+
+ %% Open a new snapshot state to simulate a restart.
+ Recover = init_state(Config),
+ undefined = ra_snapshot:pending(Recover),
+ %% Both the latest checkpoint and the snapshot exist.
+ {55, 5} = ra_snapshot:latest_checkpoint(Recover),
+ {45, 4} = ra_snapshot:current(Recover),
+ %% The latest checkpoint has the highest index so it is used for recovery.
+ {ok, CP3Meta, ?FUNCTION_NAME} = ra_snapshot:recover(Recover),
+
+ %% Initializing the state removes any checkpoints older than the snapshot,
+ %% so there should be one snapshot and one checkpoint only.
+ ?assertMatch([_], list_snap_dirs(Config)),
+ ?assertMatch([_], list_checkpoint_dirs(Config)),
+
+ ok.
+
+init_recover_corrupt(Config) ->
+ State0 = init_state(Config),
+
+ %% Take a checkpoint.
+ Meta = meta(55, 2, [node()]),
+ MacRef = ?FUNCTION_NAME,
+ {State1, _} = ra_snapshot:begin_snapshot(Meta, MacRef, checkpoint, State0),
+ receive
+ {ra_log_event, {snapshot_written, {55, 2} = IdxTerm, checkpoint}} ->
+ _ = ra_snapshot:complete_snapshot(IdxTerm, checkpoint, State1),
+ ok
+ after 1000 ->
+ error(snapshot_event_timeout)
+ end,
+
+ %% Delete the file but leave the directory intact.
+ CorruptDir = filename:join(?config(checkpoint_dir, Config),
+ ra_lib:zpad_hex(2) ++ "_" ++ ra_lib:zpad_hex(55)),
+ ok = file:delete(filename:join(CorruptDir, "snapshot.dat")),
+
+ Recover = init_state(Config),
+ %% The checkpoint isn't recovered and the directory is cleaned up.
+ undefined = ra_snapshot:pending(Recover),
+ undefined = ra_snapshot:current(Recover),
+ undefined = ra_snapshot:latest_checkpoint(Recover),
+ {error, no_current_snapshot} = ra_snapshot:recover(Recover),
+ false = filelib:is_dir(CorruptDir),
+
+ ok.
+
+init_recover_multi_corrupt(Config) ->
+ State0 = init_state(Config),
+ {error, no_current_snapshot} = ra_snapshot:recover(State0),
+
+ %% Checkpoint at 55.
+ CP1Meta = meta(55, 2, [node()]),
+ {State1, _} =
+ ra_snapshot:begin_snapshot(CP1Meta, ?FUNCTION_NAME, checkpoint, State0),
+ State2 = receive
+ {ra_log_event, {snapshot_written, IdxTerm1, checkpoint}} ->
+ ra_snapshot:complete_snapshot(IdxTerm1, checkpoint, State1)
+ after 1000 ->
+ error(snapshot_event_timeout)
+ end,
+
+ %% Checkpoint at 165.
+ CP2Meta = meta(165, 2, [node()]),
+ {State3, _} =
+ ra_snapshot:begin_snapshot(CP2Meta, ?FUNCTION_NAME, checkpoint, State2),
+ State4 = receive
+ {ra_log_event, {snapshot_written, IdxTerm2, checkpoint}} ->
+ ra_snapshot:complete_snapshot(IdxTerm2, checkpoint, State3)
+ after 1000 ->
+ error(snapshot_event_timeout)
+ end,
+ {165, 2} = ra_snapshot:latest_checkpoint(State4),
+
+ %% Corrupt the latest checkpoint.
+ Corrupt = filename:join(?config(checkpoint_dir, Config),
+ ra_lib:zpad_hex(2) ++ "_" ++ ra_lib:zpad_hex(165)),
+ ok = file:delete(filename:join(Corrupt, "snapshot.dat")),
+
+ %% Open a new snapshot state to simulate a restart.
+ Recover = init_state(Config),
+ undefined = ra_snapshot:pending(Recover),
+ %% The latest non-corrupt checkpoint is now the latest checkpoint.
+ {55, 2} = ra_snapshot:latest_checkpoint(Recover),
+ %% The corrupt checkpoint is cleaned up.
+ false = filelib:is_dir(Corrupt),
+
+ {ok, CP1Meta, ?FUNCTION_NAME} = ra_snapshot:recover(Recover),
+
+ ok.
+
+%%%===================================================================
+%%% Helper functions
+%%%===================================================================
+
+init_state(Config) ->
+ ra_snapshot:init(?config(uid, Config),
+ ra_log_snapshot,
+ ?config(snap_dir, Config),
+ ?config(checkpoint_dir, Config),
+ undefined, ?config(max_checkpoints, Config)).
+
+meta(Idx, Term, Cluster) ->
+ #{index => Idx,
+ term => Term,
+ cluster => Cluster,
+ machine_version => 1}.
+
+list_checkpoint_dirs(Config) ->
+ CPDir = ?config(checkpoint_dir, Config),
+ filelib:wildcard(filename:join(CPDir, "*")).
+
+list_snap_dirs(Config) ->
+ SnapDir = ?config(snap_dir, Config),
+ filelib:wildcard(filename:join(SnapDir, "*")).
diff --git a/test/ra_log_2_SUITE.erl b/test/ra_log_2_SUITE.erl
index d25eb4b1..9a8cdd89 100644
--- a/test/ra_log_2_SUITE.erl
+++ b/test/ra_log_2_SUITE.erl
@@ -311,7 +311,8 @@ sparse_read_out_of_range_2(Config) ->
{Log2, _} = ra_log:update_release_cursor(SnapIdx, #{}, 2,
<<"snap@10">>, Log1),
{Log3, _} = receive
- {ra_log_event, {snapshot_written, {10, 2}} = Evt} ->
+ {ra_log_event, {snapshot_written, {10, 2},
+ snapshot} = Evt} ->
ra_log:handle_event(Evt, Log2)
after 5000 ->
flush(),
@@ -397,7 +398,8 @@ written_event_after_snapshot(Config) ->
{Log2, _} = ra_log:update_release_cursor(2, #{}, 1,
<<"one+two">>, Log1b),
{Log3, _} = receive
- {ra_log_event, {snapshot_written, {2, 1}} = Evt} ->
+ {ra_log_event, {snapshot_written, {2, 1},
+ snapshot} = Evt} ->
ra_log:handle_event(Evt, Log2)
after 500 ->
exit(snapshot_written_timeout)
@@ -412,7 +414,7 @@ written_event_after_snapshot(Config) ->
<<"one+two+three+four">>,
Log6b),
_ = receive
- {ra_log_event, {snapshot_written, {4, 1}} = E} ->
+ {ra_log_event, {snapshot_written, {4, 1}, snapshot} = E} ->
ra_log:handle_event(E, Log7)
after 500 ->
exit(snapshot_written_timeout)
@@ -699,7 +701,8 @@ snapshot_written_after_installation(Config) ->
{Log2, _} = ra_log:update_release_cursor(5, #{}, 1,
<<"one-five">>, Log1),
DelayedSnapWritten = receive
- {ra_log_event, {snapshot_written, {5, 1}} = Evt} ->
+ {ra_log_event, {snapshot_written, {5, 1},
+ snapshot} = Evt} ->
Evt
after 1000 ->
flush(),
@@ -1363,15 +1366,17 @@ meta(Idx, Term, Cluster) ->
create_snapshot_chunk(Config, #{index := Idx} = Meta, Context) ->
OthDir = filename:join(?config(priv_dir, Config), "snapshot_installation"),
+ CPDir = filename:join(?config(priv_dir, Config), "checkpoints"),
ok = ra_lib:make_dir(OthDir),
+ ok = ra_lib:make_dir(CPDir),
Sn0 = ra_snapshot:init(<<"someotheruid_adsfasdf">>, ra_log_snapshot,
- OthDir),
+ OthDir, CPDir, undefined, ?DEFAULT_MAX_CHECKPOINTS),
MacRef = <<"9">>,
- {Sn1, _} = ra_snapshot:begin_snapshot(Meta, MacRef, Sn0),
+ {Sn1, _} = ra_snapshot:begin_snapshot(Meta, MacRef, snapshot, Sn0),
Sn2 =
receive
- {ra_log_event, {snapshot_written, {Idx, 2} = IdxTerm}} ->
- ra_snapshot:complete_snapshot(IdxTerm, Sn1)
+ {ra_log_event, {snapshot_written, {Idx, 2} = IdxTerm, snapshot}} ->
+ ra_snapshot:complete_snapshot(IdxTerm, snapshot, Sn1)
after 1000 ->
exit(snapshot_timeout)
end,
diff --git a/test/ra_log_snapshot_SUITE.erl b/test/ra_log_snapshot_SUITE.erl
index b7188955..adfd560c 100644
--- a/test/ra_log_snapshot_SUITE.erl
+++ b/test/ra_log_snapshot_SUITE.erl
@@ -73,7 +73,7 @@ roundtrip(Config) ->
Dir = ?config(dir, Config),
SnapshotMeta = meta(33, 94, [{banana, node@jungle}, {banana, node@savanna}]),
SnapshotRef = my_state,
- ok = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef),
+ ok = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef, true),
Context = #{can_accept_full_file => true},
?assertEqual({SnapshotMeta, SnapshotRef}, read(Dir, Context)),
ok.
@@ -82,7 +82,7 @@ roundtrip_compat(Config) ->
Dir = ?config(dir, Config),
SnapshotMeta = meta(33, 94, [{banana, node@jungle}, {banana, node@savanna}]),
SnapshotRef = my_state,
- ok = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef),
+ ok = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef, true),
?assertEqual({SnapshotMeta, SnapshotRef}, read(Dir)),
ok.
@@ -107,7 +107,7 @@ test_accept(Config, Name, DataSize, FullFile, ChunkSize) ->
ct:pal("test_accept ~w ~b ~w ~b", [Name, DataSize, FullFile, ChunkSize]),
SnapshotMeta = meta(33, 94, [{banana, node@jungle}, {banana, node@savanna}]),
SnapshotRef = crypto:strong_rand_bytes(DataSize),
- ok = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef),
+ ok = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef, true),
Context = #{can_accept_full_file => FullFile},
{ok, Meta, St} = ra_log_snapshot:begin_read(Dir, Context),
%% how to ensure
@@ -180,7 +180,7 @@ read_meta_data(Config) ->
Dir = ?config(dir, Config),
SnapshotMeta = meta(33, 94, [{banana, node@jungle}, {banana, node@savanna}]),
SnapshotRef = my_state,
- ok = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef),
+ ok = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef, true),
{ok, SnapshotMeta} = ra_log_snapshot:read_meta(Dir),
ok.
@@ -188,7 +188,7 @@ recover_same_as_read(Config) ->
Dir = ?config(dir, Config),
SnapshotMeta = meta(33, 94, [{banana, node@jungle}, {banana, node@savanna}]),
SnapshotData = my_state,
- ok = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotData),
+ ok = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotData, true),
{ok, SnapshotMeta, SnapshotData} = ra_log_snapshot:recover(Dir),
ok.
diff --git a/test/ra_snapshot_SUITE.erl b/test/ra_snapshot_SUITE.erl
index 1298c618..cf078b5b 100644
--- a/test/ra_snapshot_SUITE.erl
+++ b/test/ra_snapshot_SUITE.erl
@@ -14,6 +14,7 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
+-include("src/ra.hrl").
%%%===================================================================
%%% Common Test callbacks
@@ -62,9 +63,14 @@ init_per_testcase(TestCase, Config) ->
ok = ra_snapshot:init_ets(),
SnapDir = filename:join([?config(priv_dir, Config),
TestCase, "snapshots"]),
+ CheckpointDir = filename:join([?config(priv_dir, Config),
+ TestCase, "checkpoints"]),
ok = ra_lib:make_dir(SnapDir),
+ ok = ra_lib:make_dir(CheckpointDir),
[{uid, ra_lib:to_binary(TestCase)},
- {snap_dir, SnapDir} | Config].
+ {snap_dir, SnapDir},
+ {checkpoint_dir, CheckpointDir},
+ {max_checkpoints, ?DEFAULT_MAX_CHECKPOINTS} | Config].
end_per_testcase(_TestCase, _Config) ->
ok.
@@ -75,7 +81,7 @@ end_per_testcase(_TestCase, _Config) ->
init_empty(Config) ->
UId = ?config(uid, Config),
- State = ra_snapshot:init(UId, ?MODULE, ?config(snap_dir, Config), undefined),
+ State = init_state(Config),
%% no pending, no current
undefined = ra_snapshot:current(State),
undefined = ra_snapshot:pending(State),
@@ -85,17 +91,16 @@ init_empty(Config) ->
take_snapshot(Config) ->
UId = ?config(uid, Config),
- State0 = ra_snapshot:init(UId, ra_log_snapshot,
- ?config(snap_dir, Config), undefined),
+ State0 = init_state(Config),
Meta = meta(55, 2, [node()]),
MacRef = ?FUNCTION_NAME,
{State1, [{monitor, process, snapshot_writer, Pid}]} =
- ra_snapshot:begin_snapshot(Meta, MacRef, State0),
+ ra_snapshot:begin_snapshot(Meta, MacRef, snapshot, State0),
undefined = ra_snapshot:current(State1),
- {Pid, {55, 2}} = ra_snapshot:pending(State1),
+ {Pid, {55, 2}, snapshot} = ra_snapshot:pending(State1),
receive
- {ra_log_event, {snapshot_written, {55, 2} = IdxTerm}} ->
- State = ra_snapshot:complete_snapshot(IdxTerm, State1),
+ {ra_log_event, {snapshot_written, {55, 2} = IdxTerm, snapshot}} ->
+ State = ra_snapshot:complete_snapshot(IdxTerm, snapshot, State1),
undefined = ra_snapshot:pending(State),
{55, 2} = ra_snapshot:current(State),
55 = ra_snapshot:last_index_for(UId),
@@ -108,13 +113,13 @@ take_snapshot(Config) ->
take_snapshot_crash(Config) ->
UId = ?config(uid, Config),
SnapDir = ?config(snap_dir, Config),
- State0 = ra_snapshot:init(UId, ra_log_snapshot, SnapDir, undefined),
+ State0 = init_state(Config),
Meta = meta(55, 2, [node()]),
MacRef = ?FUNCTION_NAME,
{State1, [{monitor, process, snapshot_writer, Pid}]} =
- ra_snapshot:begin_snapshot(Meta, MacRef, State0),
+ ra_snapshot:begin_snapshot(Meta, MacRef, snapshot, State0),
undefined = ra_snapshot:current(State1),
- {Pid, {55, 2}} = ra_snapshot:pending(State1),
+ {Pid, {55, 2}, snapshot} = ra_snapshot:pending(State1),
receive
{ra_log_event, _} ->
%% just pretend the snapshot event didn't happen
@@ -137,22 +142,20 @@ take_snapshot_crash(Config) ->
init_recover(Config) ->
UId = ?config(uid, Config),
- State0 = ra_snapshot:init(UId, ra_log_snapshot,
- ?config(snap_dir, Config), undefined),
+ State0 = init_state(Config),
Meta = meta(55, 2, [node()]),
{State1, [{monitor, process, snapshot_writer, _}]} =
- ra_snapshot:begin_snapshot(Meta, ?FUNCTION_NAME, State0),
+ ra_snapshot:begin_snapshot(Meta, ?FUNCTION_NAME, snapshot, State0),
receive
- {ra_log_event, {snapshot_written, IdxTerm}} ->
- _ = ra_snapshot:complete_snapshot(IdxTerm, State1),
+ {ra_log_event, {snapshot_written, IdxTerm, snapshot}} ->
+ _ = ra_snapshot:complete_snapshot(IdxTerm, snapshot, State1),
ok
after 1000 ->
error(snapshot_event_timeout)
end,
%% open a new snapshot state to simulate a restart
- Recover = ra_snapshot:init(UId, ra_log_snapshot,
- ?config(snap_dir, Config), undefined),
+ Recover = init_state(Config),
%% ensure last snapshot is recovered
%% it also needs to be validated as could have crashed mid write
undefined = ra_snapshot:pending(Recover),
@@ -165,22 +168,20 @@ init_recover(Config) ->
init_recover_voter_status(Config) ->
UId = ?config(uid, Config),
- State0 = ra_snapshot:init(UId, ra_log_snapshot,
- ?config(snap_dir, Config), undefined),
+ State0 = init_state(Config),
Meta = meta(55, 2, #{node() => #{voter_status => test}}),
{State1, [{monitor, process, snapshot_writer, _}]} =
- ra_snapshot:begin_snapshot(Meta, ?FUNCTION_NAME, State0),
+ ra_snapshot:begin_snapshot(Meta, ?FUNCTION_NAME, snapshot, State0),
receive
- {ra_log_event, {snapshot_written, IdxTerm}} ->
- _ = ra_snapshot:complete_snapshot(IdxTerm, State1),
+ {ra_log_event, {snapshot_written, IdxTerm, snapshot}} ->
+ _ = ra_snapshot:complete_snapshot(IdxTerm, snapshot, State1),
ok
after 1000 ->
error(snapshot_event_timeout)
end,
%% open a new snapshot state to simulate a restart
- Recover = ra_snapshot:init(UId, ra_log_snapshot,
- ?config(snap_dir, Config), undefined),
+ Recover = init_state(Config),
%% ensure last snapshot is recovered
%% it also needs to be validated as could have crashed mid write
undefined = ra_snapshot:pending(Recover),
@@ -193,17 +194,17 @@ init_recover_voter_status(Config) ->
init_multi(Config) ->
UId = ?config(uid, Config),
- State0 = ra_snapshot:init(UId, ra_log_snapshot,
- ?config(snap_dir, Config), undefined),
+ State0 = init_state(Config),
Meta1 = meta(55, 2, [node()]),
Meta2 = meta(165, 2, [node()]),
- {State1, _} = ra_snapshot:begin_snapshot(Meta1, ?FUNCTION_NAME, State0),
+ {State1, _} = ra_snapshot:begin_snapshot(Meta1, ?FUNCTION_NAME, snapshot,
+ State0),
receive
- {ra_log_event, {snapshot_written, IdxTerm}} ->
- State2 = ra_snapshot:complete_snapshot(IdxTerm, State1),
+ {ra_log_event, {snapshot_written, IdxTerm, snapshot}} ->
+ State2 = ra_snapshot:complete_snapshot(IdxTerm, snapshot, State1),
{State3, _} = ra_snapshot:begin_snapshot(Meta2, ?FUNCTION_NAME,
- State2),
- {_, {165, 2}} = ra_snapshot:pending(State3),
+ snapshot, State2),
+ {_, {165, 2}, snapshot} = ra_snapshot:pending(State3),
{55, 2} = ra_snapshot:current(State3),
55 = ra_snapshot:last_index_for(UId),
receive
@@ -218,8 +219,7 @@ init_multi(Config) ->
end,
%% open a new snapshot state to simulate a restart
- Recover = ra_snapshot:init(UId, ra_log_snapshot,
- ?config(snap_dir, Config), undefined),
+ Recover = init_state(Config),
%% ensure last snapshot is recovered
%% it also needs to be validated as could have crashed mid write
undefined = ra_snapshot:pending(Recover),
@@ -233,16 +233,17 @@ init_multi(Config) ->
init_recover_multi_corrupt(Config) ->
UId = ?config(uid, Config),
SnapsDir = ?config(snap_dir, Config),
- State0 = ra_snapshot:init(UId, ra_log_snapshot, SnapsDir, undefined),
+ State0 = init_state(Config),
Meta1 = meta(55, 2, [node()]),
Meta2 = meta(165, 2, [node()]),
- {State1, _} = ra_snapshot:begin_snapshot(Meta1, ?FUNCTION_NAME, State0),
+ {State1, _} = ra_snapshot:begin_snapshot(Meta1, ?FUNCTION_NAME, snapshot,
+ State0),
receive
- {ra_log_event, {snapshot_written, IdxTerm}} ->
- State2 = ra_snapshot:complete_snapshot(IdxTerm, State1),
+ {ra_log_event, {snapshot_written, IdxTerm, snapshot}} ->
+ State2 = ra_snapshot:complete_snapshot(IdxTerm, snapshot, State1),
{State3, _} = ra_snapshot:begin_snapshot(Meta2, ?FUNCTION_NAME,
- State2),
- {_, {165, 2}} = ra_snapshot:pending(State3),
+ snapshot, State2),
+ {_, {165, 2}, snapshot} = ra_snapshot:pending(State3),
{55, 2} = ra_snapshot:current(State3),
55 = ra_snapshot:last_index_for(UId),
receive
@@ -261,8 +262,7 @@ init_recover_multi_corrupt(Config) ->
ok = file:delete(filename:join(Corrupt, "snapshot.dat")),
%% open a new snapshot state to simulate a restart
- Recover = ra_snapshot:init(UId, ra_log_snapshot,
- ?config(snap_dir, Config), undefined),
+ Recover = init_state(Config),
%% ensure last snapshot is recovered
%% it also needs to be validated as could have crashed mid write
undefined = ra_snapshot:pending(Recover),
@@ -280,11 +280,12 @@ init_recover_corrupt(Config) ->
UId = ?config(uid, Config),
Meta = meta(55, 2, [node()]),
SnapsDir = ?config(snap_dir, Config),
- State0 = ra_snapshot:init(UId, ra_log_snapshot, SnapsDir, undefined),
- {State1, _} = ra_snapshot:begin_snapshot(Meta, ?FUNCTION_NAME, State0),
+ State0 = init_state(Config),
+ {State1, _} = ra_snapshot:begin_snapshot(Meta, ?FUNCTION_NAME, snapshot,
+ State0),
_ = receive
- {ra_log_event, {snapshot_written, IdxTerm}} ->
- ra_snapshot:complete_snapshot(IdxTerm, State1)
+ {ra_log_event, {snapshot_written, IdxTerm, snapshot}} ->
+ ra_snapshot:complete_snapshot(IdxTerm, snapshot, State1)
after 1000 ->
error(snapshot_event_timeout)
end,
@@ -297,8 +298,7 @@ init_recover_corrupt(Config) ->
%% clear out ets table
ets:delete_all_objects(ra_log_snapshot_state),
%% open a new snapshot state to simulate a restart
- Recover = ra_snapshot:init(UId, ra_log_snapshot,
- ?config(snap_dir, Config), undefined),
+ Recover = init_state(Config),
%% ensure the corrupt snapshot isn't recovered
undefined = ra_snapshot:pending(Recover),
undefined = ra_snapshot:current(Recover),
@@ -308,16 +308,14 @@ init_recover_corrupt(Config) ->
ok.
read_snapshot(Config) ->
- UId = ?config(uid, Config),
- State0 = ra_snapshot:init(UId, ra_log_snapshot,
- ?config(snap_dir, Config), undefined),
+ State0 = init_state(Config),
Meta = meta(55, 2, [node()]),
MacRef = crypto:strong_rand_bytes(1024 * 4),
{State1, _} =
- ra_snapshot:begin_snapshot(Meta, MacRef, State0),
+ ra_snapshot:begin_snapshot(Meta, MacRef, snapshot, State0),
State = receive
- {ra_log_event, {snapshot_written, IdxTerm}} ->
- ra_snapshot:complete_snapshot(IdxTerm, State1)
+ {ra_log_event, {snapshot_written, IdxTerm, snapshot}} ->
+ ra_snapshot:complete_snapshot(IdxTerm, snapshot, State1)
after 1000 ->
error(snapshot_event_timeout)
end,
@@ -340,8 +338,7 @@ read_all_chunks(ChunkState, State, Size, Acc) ->
accept_snapshot(Config) ->
UId = ?config(uid, Config),
- State0 = ra_snapshot:init(UId, ra_log_snapshot,
- ?config(snap_dir, Config), undefined),
+ State0 = init_state(Config),
Meta = meta(55, 2, [node()]),
MetaBin = term_to_binary(Meta),
MacRef = crypto:strong_rand_bytes(1024 * 4),
@@ -373,8 +370,7 @@ accept_snapshot(Config) ->
abort_accept(Config) ->
UId = ?config(uid, Config),
- State0 = ra_snapshot:init(UId, ra_log_snapshot,
- ?config(snap_dir, Config), undefined),
+ State0 = init_state(Config),
Meta = meta(55, 2, [node()]),
MacRef = crypto:strong_rand_bytes(1024 * 4),
MacBin = term_to_binary(MacRef),
@@ -399,13 +395,13 @@ abort_accept(Config) ->
accept_receives_snapshot_written_with_lower_index(Config) ->
UId = ?config(uid, Config),
- SnapDir = ?config(snap_dir, Config),
- State0 = ra_snapshot:init(UId, ra_log_snapshot, SnapDir, undefined),
+ State0 = init_state(Config),
MetaLocal = meta(55, 2, [node()]),
MetaRemote = meta(165, 2, [node()]),
MetaRemoteBin = term_to_binary(MetaRemote),
%% begin a local snapshot
- {State1, _} = ra_snapshot:begin_snapshot(MetaLocal, ?FUNCTION_NAME, State0),
+ {State1, _} = ra_snapshot:begin_snapshot(MetaLocal, ?FUNCTION_NAME,
+ snapshot, State0),
MacRef = crypto:strong_rand_bytes(1024),
MacBin = term_to_binary(MacRef),
Crc = erlang:crc32([<<(size(MetaRemoteBin)):32/unsigned>>,
@@ -422,8 +418,8 @@ accept_receives_snapshot_written_with_lower_index(Config) ->
%% then the snapshot written event is received
receive
- {ra_log_event, {snapshot_written, {55, 2} = IdxTerm}} ->
- State4 = ra_snapshot:complete_snapshot(IdxTerm, State3),
+ {ra_log_event, {snapshot_written, {55, 2} = IdxTerm, snapshot}} ->
+ State4 = ra_snapshot:complete_snapshot(IdxTerm, snapshot, State3),
undefined = ra_snapshot:pending(State4),
{55, 2} = ra_snapshot:current(State4),
55 = ra_snapshot:last_index_for(UId),
@@ -439,12 +435,12 @@ accept_receives_snapshot_written_with_lower_index(Config) ->
accept_receives_snapshot_written_with_higher_index(Config) ->
UId = ?config(uid, Config),
- SnapDir = ?config(snap_dir, Config),
- State0 = ra_snapshot:init(UId, ra_log_snapshot, SnapDir, undefined),
+ State0 = init_state(Config),
MetaRemote = meta(55, 2, [node()]),
MetaLocal = meta(165, 2, [node()]),
%% begin a local snapshot
- {State1, _} = ra_snapshot:begin_snapshot(MetaLocal, ?FUNCTION_NAME, State0),
+ {State1, _} = ra_snapshot:begin_snapshot(MetaLocal, ?FUNCTION_NAME,
+ snapshot, State0),
MacRef = crypto:strong_rand_bytes(1024),
MacBin = term_to_binary(MacRef),
%% split into 1024 max byte chunks
@@ -459,8 +455,8 @@ accept_receives_snapshot_written_with_higher_index(Config) ->
%% then the snapshot written event is received
receive
- {ra_log_event, {snapshot_written, {55, 2} = IdxTerm}} ->
- State4 = ra_snapshot:complete_snapshot(IdxTerm, State3),
+ {ra_log_event, {snapshot_written, {55, 2} = IdxTerm, snapshot}} ->
+ State4 = ra_snapshot:complete_snapshot(IdxTerm, snapshot, State3),
undefined = ra_snapshot:pending(State4),
{55, 2} = ra_snapshot:current(State4),
55 = ra_snapshot:last_index_for(UId),
@@ -474,6 +470,12 @@ accept_receives_snapshot_written_with_higher_index(Config) ->
end,
ok.
+init_state(Config) ->
+ ra_snapshot:init(?config(uid, Config), ra_log_snapshot,
+ ?config(snap_dir, Config),
+ ?config(checkpoint_dir, Config),
+ undefined, ?config(max_checkpoints, Config)).
+
meta(Idx, Term, Cluster) ->
#{index => Idx,
term => Term,