Skip to content

Commit

Permalink
Segment writer bug fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
kjnilsson committed Dec 2, 2024
1 parent 324c36f commit f3aa692
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 28 deletions.
4 changes: 2 additions & 2 deletions src/ra_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -683,8 +683,8 @@ handle_event({segments, TidRanges, NewSegs},
readers = Readers
} = State0) ->
Reader = ra_log_reader:update_segments(NewSegs, Reader0),
%% the tid ranges arrive in the order they were written so we need to
%% foldr here to process the oldest first
%% the tid ranges arrive in the reverse order they were written
%% (new -> old) so we need to foldr here to process the oldest first
Mt = lists:foldr(
fun ({Tid, Range}, Acc0) ->
{Spec, Acc} = ra_mt:record_flushed(Tid, Range, Acc0),
Expand Down
60 changes: 36 additions & 24 deletions src/ra_log_segment_writer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ handle_cast({mem_tables, Ranges, WalFile}, #state{data_dir = Dir,
T1 = erlang:monotonic_time(),
_ = [begin
{_, Failures} = ra_lib:partition_parallel(
fun (E) ->
ok = flush_mem_table_ranges(E, State),
fun (TidRange) ->
ok = flush_mem_table_ranges(TidRange, State),
true
end, Tabs, infinity),
case Failures of
Expand Down Expand Up @@ -253,34 +253,46 @@ get_overview(#state{data_dir = Dir,
flush_mem_table_ranges({ServerUId, TidRanges0},
#state{system = System} = State) ->
SnapIdx = snap_idx(ServerUId),
%% TidRanges arrive here sorted new -> old.

%% truncate and limit all ranges to create a contiguous non-overlapping
%% list of tid ranges to flush to disk
TidRanges =
lists:foldl(fun ({T, Range}, []) ->
[{T, ra_range:truncate(SnapIdx, Range)}];
({T, Range0}, [{_T, {Start, _}} | _] = Acc) ->
Range1 = ra_range:truncate(SnapIdx, Range0),
case ra_range:limit(Start, Range1) of
undefined ->
Acc;
Range ->
[{T, Range} | Acc]
end
end, [], TidRanges0),
%% now TidRanges are sorted old -> new, i.e the correct order of
%% processing
TidRanges = lists:foldl(
fun ({T, Range0}, []) ->
case ra_range:truncate(SnapIdx, Range0) of
undefined ->
[];
Range ->
[{T, Range}]
end;
({T, Range0}, [{_T, {Start, _}} | _] = Acc) ->
Range1 = ra_range:truncate(SnapIdx, Range0),
case ra_range:limit(Start, Range1) of
undefined ->
Acc;
Range ->
[{T, Range} | Acc]
end
end, [], TidRanges0),

SegRefs0 = lists:append(
[flush_mem_table_range(ServerUId, TidRange, State)
|| {_Tid, Range} = TidRange <- TidRanges,
Range =/= undefined]),
lists:reverse(
%% segrefs are returned in appended order so new -> old
%% so we need to reverse them so that the final appended list
%% of segrefs is in the old -> new order
[flush_mem_table_range(ServerUId, TidRange, State)
|| TidRange <- TidRanges])),

%% compact cases where a segment was appended in a subsequent call to
%% flush_mem_table_range
%% the list of segrefs is returned in new -> old order which is the same
%% order they are kept by the ra_log
SegRefs = lists:reverse(
lists:foldl(
fun ({_, _, FILE} = New, [{_, _, FILE} | Rem]) ->
%% same
[New | Rem];
fun ({_, _, FILE}, [{_, _, FILE} | _] = Acc) ->
Acc;
(Seg, Acc) ->
[Seg | Acc]
end, [], SegRefs0)),
Expand Down Expand Up @@ -372,8 +384,8 @@ append_to_segment(UId, Tid, Idx, EndIdx, Seg0, Closed, State) ->
StartIdx = start_index(UId, Idx),
case Idx < StartIdx of
true ->
%% TODO: we could combine registered check with this
%% although registered check could be slow
%% a snapshot must have been completed after we last checked
%% the start idx, continue flush from new start index.
append_to_segment(UId, Tid, StartIdx, EndIdx, Seg0,
Closed, State);
false ->
Expand All @@ -383,11 +395,11 @@ append_to_segment(UId, Tid, Idx, EndIdx, Seg0, Closed, State) ->
[Idx, UId, StartIdx]),
case ra_directory:is_registered_uid(State#state.system, UId) of
true ->
?ERROR("segment_writer: uid ~s is registered, exiting...",
?ERROR("segment_writer: uid ~ts is registered, exiting...",
[UId]),
exit({missing_index, UId, Idx});
false ->
?INFO("segment_writer: UId ~s was not registered, skipping",
?INFO("segment_writer: uid ~ts was not registered, skipping",
[UId]),
undefined
end
Expand Down
13 changes: 11 additions & 2 deletions test/ra_log_2_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ receive_segment(Config) ->
ok.

delete_during_segment_flush(Config) ->
%% this test doesn't necessarily trigger the potential issue but is
%% worth keeping around
Log0 = ra_log_init(Config),
Data = crypto:strong_rand_bytes(4000),
% write a few entries
Expand All @@ -228,14 +230,21 @@ delete_during_segment_flush(Config) ->
{PostWritten, _} = ra_log:last_written(L),
PostWritten >= (PreWritten + 10000)
end, 100),
_Ref = monitor(process, ra_log_segment_writer),
Ref = monitor(process, ra_log_segment_writer),
% force wal roll over
ok = ra_log_wal:force_roll_over(ra_log_wal),

timer:sleep(0),
ra_log:delete_everything(Log2),


timer:sleep(1000),
receive
{'DOWN', Ref, _, _, _} ->
flush(),
ct:fail("segment writer unexpectedly exited")
after 100 ->
ok
end,
flush(),

ok.
Expand Down
68 changes: 68 additions & 0 deletions test/ra_log_segment_writer_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ all_tests() ->
accept_mem_tables_for_down_server,
accept_mem_tables_with_deleted_server,
accept_mem_tables_with_corrupt_segment,
accept_mem_tables_multiple_ranges,
accept_mem_tables_multiple_ranges_snapshot,
truncate_segments,
truncate_segments_with_pending_update,
truncate_segments_with_pending_overwrite,
Expand Down Expand Up @@ -259,6 +261,7 @@ accept_mem_tables_multi_segment(Config) ->
end,
ok = gen_server:stop(Pid),
ok.

accept_mem_tables_multi_segment_overwrite(Config) ->
Dir = ?config(wal_dir, Config),
UId = ?config(uid, Config),
Expand Down Expand Up @@ -462,6 +465,71 @@ accept_mem_tables_with_corrupt_segment(Config) ->
ok = gen_server:stop(TblWriterPid),
ok.

accept_mem_tables_multiple_ranges(Config)->
Dir = ?config(wal_dir, Config),
SegConf = #{max_count => 16},
{ok, TblWriterPid} = ra_log_segment_writer:start_link(#{system => default,
name => ?SEGWR,
data_dir => Dir,
segment_conf => SegConf}),
UId = ?config(uid, Config),
Entries = [{N, 42, N} || N <- lists:seq(1, 32)],
Mt = make_mem_table(UId, Entries),
Entries2 = [{N, 42, N} || N <- lists:seq(33, 64)],
Mt2 = make_mem_table(UId, Entries2),
Ranges = #{UId => [
{ra_mt:tid(Mt2), ra_mt:range(Mt2)},
{ra_mt:tid(Mt), ra_mt:range(Mt)}
]},
ok = ra_log_segment_writer:accept_mem_tables(?SEGWR, Ranges,
make_wal(Config, "w1.wal")),
receive
{ra_log_event, {segments, _TidRanges, SegRefs}} ->
?assertMatch([
{49, 64, _},
{33, 48, _},
{17, 32, _},
{1, 16, _}
], SegRefs),
ok
after 3000 ->
flush(),
throw(ra_log_event_timeout)
end,
ok = gen_server:stop(TblWriterPid),
ok.

accept_mem_tables_multiple_ranges_snapshot(Config)->
Dir = ?config(wal_dir, Config),
SegConf = #{max_count => 16},
{ok, TblWriterPid} = ra_log_segment_writer:start_link(#{system => default,
name => ?SEGWR,
data_dir => Dir,
segment_conf => SegConf}),
UId = ?config(uid, Config),
Entries = [{N, 42, N} || N <- lists:seq(1, 32)],
Mt = make_mem_table(UId, Entries),
Entries2 = [{N, 42, N} || N <- lists:seq(33, 64)],
Mt2 = make_mem_table(UId, Entries2),
Ranges = #{UId => [
{ra_mt:tid(Mt2), ra_mt:range(Mt2)},
{ra_mt:tid(Mt), ra_mt:range(Mt)}
]},
ets:insert(ra_log_snapshot_state, {UId, 64}),
ok = ra_log_segment_writer:accept_mem_tables(?SEGWR, Ranges,
make_wal(Config, "w1.wal")),

receive
{ra_log_event, {segments, _TidRanges, SegRefs}} ->
?assertMatch([], SegRefs),
ok
after 3000 ->
flush(),
throw(ra_log_event_timeout)
end,
ok = gen_server:stop(TblWriterPid),
ok.

truncate_segments(Config) ->
Dir = ?config(wal_dir, Config),
SegConf = #{max_count => 12},
Expand Down

0 comments on commit f3aa692

Please sign in to comment.