Skip to content

Commit

Permalink
Detect when a segment has been modified.
Browse files Browse the repository at this point in the history
When executing a read plan it is possible that the read plan
refers to indexes not in the index of an segment that is still
being written to. This commit handles that change.
  • Loading branch information
kjnilsson committed Dec 13, 2024
1 parent 9a3a77b commit f2fdf7b
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 71 deletions.
15 changes: 11 additions & 4 deletions src/ra_log_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,16 @@ exec_read_plan(Dir, Plan, Open0, TransformFun, Acc0)
end,
lists:foldl(
fun ({Idxs, BaseName}, {Acc1, Open1}) ->
{Seg, Open} = get_segment_ext(Dir, Open1, BaseName),
{_, Acc} = ra_log_segment:read_sparse(Seg, Idxs, Fun, Acc1),
{Acc, Open}
{Seg, Open2} = get_segment_ext(Dir, Open1, BaseName),
case ra_log_segment:read_sparse(Seg, Idxs, Fun, Acc1) of
{ok, _, Acc} ->
{Acc, Open2};
{error, modified} ->
{_, Open3} = ra_flru:evict(BaseName, Open2),
{SegNew, Open} = get_segment_ext(Dir, Open3, BaseName),
{ok, _, Acc} = ra_log_segment:read_sparse(SegNew, Idxs, Fun, Acc1),
{Acc, Open}
end
end, {Acc0, Open0}, Plan).

-spec fetch_term(ra_index(), state()) -> {option(ra_index()), state()}.
Expand Down Expand Up @@ -335,7 +342,7 @@ segment_sparse_read(#?STATE{segment_refs = SegRefs,
lists:foldl(
fun ({Idxs, Fn}, {Open0, C, En0}) ->
{Seg, Open} = get_segment(Cfg, Open0, Fn),
{ReadSparseCount, Entries} =
{ok, ReadSparseCount, Entries} =
ra_log_segment:read_sparse(Seg, Idxs,
fun (I, T, B, Acc) ->
[{I, T, binary_to_term(B)} | Acc]
Expand Down
65 changes: 35 additions & 30 deletions src/ra_log_segment.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
append/4,
sync/1,
fold/6,
is_modified/1,
read_sparse/4,
term_query/2,
close/1,
close_fd/1,
range/1,
flush/1,
max_count/1,
Expand All @@ -28,6 +28,8 @@

-include("ra.hrl").

-include_lib("kernel/include/file.hrl").

-define(VERSION, 2).
-define(MAGIC, "RASG").
-define(HEADER_SIZE, 4 + (16 div 8) + (16 div 8)).
Expand Down Expand Up @@ -190,16 +192,15 @@ append(#state{cfg = #cfg{max_pending = PendingCount},
append(#state{cfg = #cfg{version = Version,
mode = append} = Cfg,
index_offset = IndexOffset,
data_start = DataStart,
data_offset = DataOffset,
range = Range0,
pending_count = PendCnt,
pending_index = IdxPend0,
pending_data = DataPend0} = State,
Index, Term, {Length, Data}) ->
% check if file is full
case IndexOffset < DataStart of
true ->

case is_full(State) of
false ->
% TODO: check length is less than #FFFFFFFF ??
Checksum = compute_checksum(Cfg, Data),
OSize = offset_size(Version),
Expand All @@ -215,7 +216,7 @@ append(#state{cfg = #cfg{version = Version,
pending_data = [DataPend0, Data],
pending_count = PendCnt + 1}
};
false ->
true ->
{error, full}
end;
append(State, Index, Term, Data)
Expand Down Expand Up @@ -277,37 +278,44 @@ fold(#state{cfg = #cfg{mode = read} = Cfg,
FromIdx, ToIdx, Fun, AccFun, Acc) ->
fold0(Cfg, Cache, FromIdx, ToIdx, Index, Fun, AccFun, Acc).

-spec is_modified(state()) -> boolean().
is_modified(#state{cfg = #cfg{fd = Fd},
data_offset = DataOffset} = State) ->
case is_full(State) of
true ->
%% a full segment cannot be appended to.
false;
false ->
%% get info and compare to data_offset
{ok, #file_info{size = Size}} = prim_file:read_handle_info(Fd),
Size > DataOffset
end.

-spec read_sparse(state(), [ra_index()],
fun((ra:index(), ra_term(), binary(), Acc) -> Acc),
Acc) ->
{NumRead :: non_neg_integer(), Acc}
{ok, NumRead :: non_neg_integer(), Acc} | {error, modified}
when Acc :: term().
read_sparse(#state{index = Index,
cfg = #cfg{fd = undefined,
filename = File }}, Indexes, AccFun, Acc) ->
%% open a temporary file descriptor for the lifetime of this read only
{ok, Fd} = file:open(File, [read, raw, binary]),
ok = file:advise(Fd, 0, 0, random),
Cache0 = prepare_cache(Fd, Indexes, Index),
Res = read_sparse0(Fd, Indexes, Index, Cache0, Acc, AccFun, 0),
_ = file:close(Fd),
Res;
read_sparse(#state{index = Index,
cfg = #cfg{fd = Fd}}, Indexes, AccFun, Acc) ->
Cache0 = prepare_cache(Fd, Indexes, Index),
read_sparse0(Fd, Indexes, Index, Cache0, Acc, AccFun, 0).
cfg = #cfg{fd = Fd}} = State,
Indexes, AccFun, Acc) ->
case is_modified(State) of
true ->
{error, modified};
false ->
Cache0 = prepare_cache(Fd, Indexes, Index),
read_sparse0(Fd, Indexes, Index, Cache0, Acc, AccFun, 0)
end.

read_sparse0(_Fd, [], _Index, _Cache, Acc, _AccFun, Num) ->
{Num, Acc};
{ok, Num, Acc};
read_sparse0(Fd, [NextIdx | Rem] = Indexes, Index, Cache0, Acc, AccFun, Num)
when is_map_key(NextIdx, Index) ->
{Term, Pos, Length, _} = map_get(NextIdx, Index),
case cache_read(Cache0, Pos, Length) of
false ->
case prepare_cache(Fd, Indexes, Index) of
undefined ->
% {ok, Data, _} = pread(Fd, undefined, Offset, Length),

%% TODO: check for partial data?
{ok, Data} = file:pread(Fd, Pos, Length),
read_sparse0(Fd, Rem, Index, undefined,
Expand Down Expand Up @@ -429,13 +437,6 @@ close(#state{cfg = #cfg{fd = Fd}}) ->
_ = file:close(Fd),
ok.

-spec close_fd(state()) -> state().
close_fd(#state{cfg = #cfg{fd = undefined}} = State) ->
State;
close_fd(#state{cfg = #cfg{fd = Fd} = Cfg} = State) ->
_ = file:close(Fd),
State#state{cfg = Cfg#cfg{fd = undefined}}.

%%% Internal

is_same_filename_all(Fn, Fn) ->
Expand Down Expand Up @@ -648,6 +649,10 @@ validate_checksum(0, _) ->
validate_checksum(Crc, Data) ->
Crc == erlang:crc32(Data).

is_full(#state{index_offset = IndexOffset,
data_start = DataStart}) ->
IndexOffset >= DataStart.

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").

Expand Down
2 changes: 1 addition & 1 deletion src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1728,7 +1728,7 @@ machine_query(QueryFun, #{cfg := #cfg{effective_machine_module = MacMod},
become(leader, OldRaftState, #{cluster := Cluster,
cluster_change_permitted := CCP0,
log := Log0} = State) ->
Log = ra_log:release_resources(maps:size(Cluster) + 2, random, Log0),
Log = ra_log:release_resources(maps:size(Cluster), sequential, Log0),
CCP = case OldRaftState of
await_condition ->
CCP0;
Expand Down
16 changes: 16 additions & 0 deletions test/ra_log_2_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ all_tests() ->
transient_writer_is_handled,
read_opt,
sparse_read,
read_plan_modified,
read_plan,
sparse_read_out_of_range,
sparse_read_out_of_range_2,
Expand Down Expand Up @@ -481,6 +482,21 @@ sparse_read(Config) ->
{99, _, _}], _LogO3} = ra_log:sparse_read([1000,5,99], LogO2),
ok.

read_plan_modified(Config) ->
Log0 = ra_log_init(Config),
Log1 = write_and_roll(1, 2, 1, Log0, 50),
Log2 = deliver_all_log_events(Log1, 100),
Plan = ra_log:partial_read([1], Log2, fun (_, _, Cmd) -> Cmd end),
{#{1 := _}, Flru} = ra_log_read_plan:execute(Plan, undefined),

Log = deliver_all_log_events(write_and_roll(2, 3, 1, Log2, 50), 100),
Plan2 = ra_log:partial_read([1,2], Log, fun (_, _, Cmd) -> Cmd end),
%% assert we can read the newly appended item with the cached
%% segment
{#{1 := _, 2 := _}, _} = ra_log_read_plan:execute(Plan2, Flru),
ra_log:close(Log),
ok.

read_plan(Config) ->
Num = 256 * 2,
Div = 2,
Expand Down
68 changes: 36 additions & 32 deletions test/ra_log_segment_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ all_tests() ->
overwrite,
term_query,
write_many,
open_read_close,
read_sparse_append_read,
open_invalid,
corrupted_segment,
large_segment,
Expand Down Expand Up @@ -82,6 +82,7 @@ corrupted_segment(Config) ->
%% ct:pal("DUMP PRE ~p", [ra_log_segment:dump_index(Fn)]),
%% check that the current state throws a missing key
{ok, SegR0} = ra_log_segment:open(Fn, #{mode => read}),
?assertNot(ra_log_segment:is_modified(SegR0)),
?assertExit({missing_key, 2},
read_sparse(SegR0, [1, 2])),

Expand Down Expand Up @@ -211,11 +212,13 @@ segref(Config) ->
full_file(Config) ->
Dir = ?config(data_dir, Config),
Fn = filename:join(Dir, "seg1.seg"),
Data = make_data(1024),
{ok, Seg0} = ra_log_segment:open(Fn, #{max_count => 2}),
Data = make_data(10),
{ok, Seg0} = ra_log_segment:open(Fn, #{max_count => 2,
max_pending => 1}),
{ok, Seg1} = ra_log_segment:append(Seg0, 1, 2, Data),
{ok, Seg} = ra_log_segment:append(Seg1, 2, 2, Data),
{error, full} = ra_log_segment:append(Seg, 3, 2, Data),
?assertNot(ra_log_segment:is_modified(Seg)),
{1,2} = ra_log_segment:range(Seg),
ok = ra_log_segment:close(Seg),
ok.
Expand Down Expand Up @@ -398,35 +401,36 @@ write_many(Config) ->
ok.



open_read_close(Config) ->
read_sparse_append_read(Config) ->
Dir = ?config(data_dir, Config),
Fn = filename:join(Dir, <<"0000000.segment">>),
{ok, Seg0} = ra_log_segment:open(Fn, #{}),
Data = crypto:strong_rand_bytes(4096),
S0 = write_until_full(1, 2, Data, Seg0),
ra_log_segment:close(S0),
% ToRead = lists:seq(1, 4096, 32),
% ct:pal("to read ~p", [ToRead]),

{ok, S1} = ra_log_segment:open(Fn, #{mode => read}),
S = ra_log_segment:close_fd(S1),
% S = S1,
{Taken, _} =
timer:tc(
fun () ->
[begin
_ = ra_log_segment:read_sparse(S, [Idx],
fun (I, _, _, Acc) ->
[I | Acc]
end, []),
ok
end || Idx <- lists:seq(1, 4096)]
end),

ct:pal("Taken ~bms", [Taken div 1000]),
{ok, W0} = ra_log_segment:open(Fn, #{}),
Data = <<"banana">>,
Term = 1,
%% write two entries in term 1
{ok, W1} = ra_log_segment:append(W0, 1, Term, Data),
{ok, W2} = ra_log_segment:append(W1, 2, Term, Data),
{ok, W3} = ra_log_segment:flush(W2),


{ok, R0} = ra_log_segment:open(Fn, #{mode => read}),
{ok, 2, [_, _]} = ra_log_segment:read_sparse(R0, [1, 2],
fun (I, _, _, Acc) ->
[I | Acc]
end, []),

?assertNot(ra_log_segment:is_modified(R0)),
%% overwrite in term 2
{ok, W4} = ra_log_segment:append(W3, 2, 2, <<"apple">>),
{ok, W5} = ra_log_segment:append(W4, 3, 2, <<"apple">>),
{ok, W} = ra_log_segment:flush(W5),
?assert(ra_log_segment:is_modified(R0)),
{error, modified} = ra_log_segment:read_sparse(R0, [2],
fun (_I, _, B, Acc) ->
[B | Acc]
end, []),
ra_log_segment:close(W),
ra_log_segment:close(R0),
ok.

write_until_full(Idx, Term, Data, Seg0) ->
Expand All @@ -443,8 +447,8 @@ make_data(Size) ->
term_to_binary(crypto:strong_rand_bytes(Size)).

read_sparse(R, Idxs) ->
{_, Entries} = ra_log_segment:read_sparse(R, Idxs,
fun (I, T, B, Acc) ->
[{I, T, B} | Acc]
end, []),
{ok, _, Entries} = ra_log_segment:read_sparse(R, Idxs,
fun (I, T, B, Acc) ->
[{I, T, B} | Acc]
end, []),
lists:reverse(Entries).
8 changes: 4 additions & 4 deletions test/ra_log_segment_writer_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -800,10 +800,10 @@ segments_for(UId, DataDir) ->
SegFiles.

read_sparse(R, Idxs) ->
{_, Entries} = ra_log_segment:read_sparse(R, Idxs,
fun(I, T, B, Acc) ->
[{I, T, B} | Acc]
end, []),
{ok, _, Entries} = ra_log_segment:read_sparse(R, Idxs,
fun(I, T, B, Acc) ->
[{I, T, B} | Acc]
end, []),
lists:reverse(Entries).

get_names(System) when is_atom(System) ->
Expand Down

0 comments on commit f2fdf7b

Please sign in to comment.