Skip to content

Commit

Permalink
Merge pull request #415 from rabbitmq/md-checkpoints
Browse files Browse the repository at this point in the history
Checkpoints
  • Loading branch information
kjnilsson authored Feb 13, 2024
2 parents 00d660b + d9890cc commit a9faaab
Show file tree
Hide file tree
Showing 15 changed files with 1,111 additions and 212 deletions.
21 changes: 21 additions & 0 deletions docs/internals/INTERNALS.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,27 @@ It is not guaranteed that a snapshot will be taken. A decision to take
a snapshot or to delay it is taken using a number of internal Ra state factors.
The goal is to minimise disk I/O activity when possible.

### Checkpointing

Checkpoints are nearly the same concept as snapshots. Snapshotting truncates
the log up to the snapshot's index, which might be undesirable for machines
which read from the log with the `{log, Indexes, Fun}` effect mentioned above.

The `{checkpoint, RaftIndex, MachineState}` effect can be used as a hint to
trigger a checkpoint. Like snapshotting, this effect is evaluated on all nodes
and when a checkpoint is taken, the machine state is saved to disk and can be
used for recovery when the machine restarts. A checkpoint being written does
not trigger any log truncation though.

The `{release_cursor, RaftIndex}` effect can then be used to promote any
existing checkpoint older than or equal to `RaftIndex` into a proper snapshot,
and any log entries older than the checkpoint's index are then truncated.

These two effects are intended for machines that use the `{log, Indexes, Fun}`
effect and can substantially improve machine recovery time compared to
snapshotting alone, especially when the machine needs to keep old log entries
around for a long time.

## State Machine Versioning

It is eventually necessary to make changes to the state machine
Expand Down
8 changes: 8 additions & 0 deletions docs/internals/STATE_MACHINE_TUTORIAL.md
Original file line number Diff line number Diff line change
Expand Up @@ -218,3 +218,11 @@ or similar.
To (potentially) trigger a snapshot return the `{release_cursor, RaftIndex, MachineState}`
effect. This is why the raft index is included in the `apply/3` function. Ra will
only create a snapshot if doing so will result in log segments being deleted.

For machines that must keep log segments on disk for some time, the
`{checkpoint, RaftIndex, MachineState}` effect can be used. This creates a
snapshot-like view of the machine state on disk but doesn't trigger log
truncation. Checkpoints can later be promoted to snapshots and trigger log
truncation by emitting a `{release_cursor, RaftIndex}` effect. The most
recent checkpoint with an index smaller than or equal to `RaftIndex` will be
promoted.
20 changes: 17 additions & 3 deletions src/ra.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@

-define(DEFAULT_SNAPSHOT_MODULE, ra_log_snapshot).

-define(DEFAULT_MAX_CHECKPOINTS, 10).

-define(RA_LOG_COUNTER_FIELDS,
[{write_ops, ?C_RA_LOG_WRITE_OPS, counter,
"Total number of write ops"},
Expand All @@ -254,6 +256,10 @@
{snapshot_bytes_written, ?C_RA_LOG_SNAPSHOT_BYTES_WRITTEN, counter,
"Number of snapshot bytes written (not installed)"},
{open_segments, ?C_RA_LOG_OPEN_SEGMENTS, gauge, "Number of open segments"},
{checkpoints_written, ?C_RA_LOG_CHECKPOINTS_WRITTEN, counter,
"Total number of checkpoints written"},
{checkpoint_bytes_written, ?C_RA_LOG_CHECKPOINT_BYTES_WRITTEN, counter,
"Number of checkpoint bytes written"},
{reserved_1, ?C_RA_LOG_RESERVED, counter, "Reserved counter"}
]).
-define(C_RA_LOG_WRITE_OPS, 1).
Expand All @@ -268,7 +274,9 @@
-define(C_RA_LOG_SNAPSHOTS_INSTALLED, 10).
-define(C_RA_LOG_SNAPSHOT_BYTES_WRITTEN, 11).
-define(C_RA_LOG_OPEN_SEGMENTS, 12).
-define(C_RA_LOG_RESERVED, 13).
-define(C_RA_LOG_CHECKPOINTS_WRITTEN, 13).
-define(C_RA_LOG_CHECKPOINT_BYTES_WRITTEN, 14).
-define(C_RA_LOG_RESERVED, 15).

-define(C_RA_SRV_AER_RECEIVED_FOLLOWER, ?C_RA_LOG_RESERVED + 1).
-define(C_RA_SRV_AER_REPLIES_SUCCESS, ?C_RA_LOG_RESERVED + 2).
Expand All @@ -290,7 +298,8 @@
-define(C_RA_SRV_TERM_AND_VOTED_FOR_UPDATES, ?C_RA_LOG_RESERVED + 18).
-define(C_RA_SRV_LOCAL_QUERIES, ?C_RA_LOG_RESERVED + 19).
-define(C_RA_SRV_INVALID_REPLY_MODE_COMMANDS, ?C_RA_LOG_RESERVED + 20).
-define(C_RA_SRV_RESERVED, ?C_RA_LOG_RESERVED + 21).
-define(C_RA_SRV_CHECKPOINTS, ?C_RA_LOG_RESERVED + 21).
-define(C_RA_SRV_RESERVED, ?C_RA_LOG_RESERVED + 22).


-define(RA_SRV_COUNTER_FIELDS,
Expand Down Expand Up @@ -335,6 +344,8 @@
"Total number of local queries"},
{invalid_reply_mode_commands, ?C_RA_SRV_INVALID_REPLY_MODE_COMMANDS, counter,
"Total number of commands received with an invalid reply-mode"},
{checkpoints, ?C_RA_SRV_CHECKPOINTS, counter,
"The number of checkpoint effects executed"},
{reserved_2, ?C_RA_SRV_RESERVED, counter, "Reserved counter"}
]).

Expand All @@ -345,6 +356,7 @@
-define(C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, ?C_RA_SRV_RESERVED + 5).
-define(C_RA_SVR_METRIC_COMMIT_LATENCY, ?C_RA_SRV_RESERVED + 6).
-define(C_RA_SVR_METRIC_TERM, ?C_RA_SRV_RESERVED + 7).
-define(C_RA_SVR_METRIC_CHECKPOINT_INDEX, ?C_RA_SRV_RESERVED + 8).

-define(RA_SRV_METRICS_COUNTER_FIELDS,
[
Expand All @@ -360,7 +372,9 @@
"The last fully written and fsynced index of the log."},
{commit_latency, ?C_RA_SVR_METRIC_COMMIT_LATENCY, gauge,
"Approximate time taken from an entry being written to the log until it is committed."},
{term, ?C_RA_SVR_METRIC_TERM, counter, "The current term."}
{term, ?C_RA_SVR_METRIC_TERM, counter, "The current term."},
{checkpoint_index, ?C_RA_SVR_METRIC_CHECKPOINT_INDEX, counter,
"The current checkpoint index."}
]).

-define(RA_COUNTER_FIELDS,
Expand Down
53 changes: 46 additions & 7 deletions src/ra_lib.erl
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@
retry/2,
retry/3,
write_file/2,
write_file/3,
sync_file/1,
lists_chunk/2,
lists_detect_sort/1,
lists_shuffle/1,
is_dir/1,
is_file/1,
ensure_dir/1,
Expand All @@ -49,6 +52,10 @@
maps_merge_with/3
]).

-type file_err() :: file:posix() | badarg | terminated | system_limit.

-export_type([file_err/0]).

-include_lib("kernel/include/file.hrl").

ceiling(X) when X < 0 ->
Expand Down Expand Up @@ -313,18 +320,23 @@ retry(Func, Attempt, Sleep) ->
retry(Func, Attempt - 1)
end.


-spec write_file(file:name_all(), iodata()) ->
ok | file_err().
write_file(Name, IOData) ->
write_file(Name, IOData, true).

-spec write_file(file:name_all(), iodata(), Sync :: boolean()) ->
ok | file_err().
write_file(Name, IOData, Sync) ->
case file:open(Name, [binary, write, raw]) of
{ok, Fd} ->
case file:write(Fd, IOData) of
ok ->
case file:sync(Fd) of
ok ->
file:close(Fd);
Err ->
_ = file:close(Fd),
Err
case Sync of
true ->
sync_and_close_fd(Fd);
false ->
ok
end;
Err ->
_ = file:close(Fd),
Expand All @@ -334,6 +346,27 @@ write_file(Name, IOData) ->
Err
end.

-spec sync_file(file:name_all()) ->
ok | file_err().
sync_file(Name) ->
case file:open(Name, [binary, write, raw]) of
{ok, Fd} ->
sync_and_close_fd(Fd);
Err ->
Err
end.

-spec sync_and_close_fd(file:fd()) ->
ok | file_err().
sync_and_close_fd(Fd) ->
case file:sync(Fd) of
ok ->
file:close(Fd);
Err ->
_ = file:close(Fd),
Err
end.

lists_chunk(0, List) ->
error(invalid_size, [0, List]);
lists_chunk(Size, List) ->
Expand Down Expand Up @@ -382,6 +415,12 @@ do_ascending(A, [B | Rem])
do_ascending(_A, _) ->
unsorted.

%% Reorder a list randomly.
-spec lists_shuffle(list()) -> list().
lists_shuffle(List0) ->
List1 = [{rand:uniform(), Elem} || Elem <- List0],
[Elem || {_, Elem} <- lists:keysort(1, List1)].

is_dir(Dir) ->
case prim_file:read_file_info(Dir) of
{ok, #file_info{type=directory}} ->
Expand Down
Loading

0 comments on commit a9faaab

Please sign in to comment.