Skip to content

Commit

Permalink
segment writer bug fix
Browse files Browse the repository at this point in the history
  • Loading branch information
kjnilsson committed Nov 29, 2024
1 parent 324c36f commit da18d26
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 18 deletions.
41 changes: 23 additions & 18 deletions src/ra_log_segment_writer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -256,31 +256,36 @@ flush_mem_table_ranges({ServerUId, TidRanges0},

%% truncate and limit all ranges to create a contiguous non-overlapping
%% list of tid ranges to flush to disk
TidRanges =
lists:foldl(fun ({T, Range}, []) ->
[{T, ra_range:truncate(SnapIdx, Range)}];
({T, Range0}, [{_T, {Start, _}} | _] = Acc) ->
Range1 = ra_range:truncate(SnapIdx, Range0),
case ra_range:limit(Start, Range1) of
undefined ->
Acc;
Range ->
[{T, Range} | Acc]
end
end, [], TidRanges0),
TidRanges = lists:foldl(
fun ({T, Range0}, []) ->
case ra_range:truncate(SnapIdx, Range0) of
undefined ->
[];
Range ->
[{T, Range}]
end;
({T, Range0}, [{_T, {Start, _}} | _] = Acc) ->
Range1 = ra_range:truncate(SnapIdx, Range0),
case ra_range:limit(Start, Range1) of
undefined ->
Acc;
Range ->
[{T, Range} | Acc]
end
end, [], TidRanges0),

SegRefs0 = lists:append(
[flush_mem_table_range(ServerUId, TidRange, State)
|| {_Tid, Range} = TidRange <- TidRanges,
Range =/= undefined]),
lists:reverse(
[flush_mem_table_range(ServerUId, TidRange, State)
|| {_Tid, Range} = TidRange <- TidRanges,
Range =/= undefined])),

%% compact cases where a segment was appended in a subsequent call to
%% flush_mem_table_range
SegRefs = lists:reverse(
lists:foldl(
fun ({_, _, FILE} = New, [{_, _, FILE} | Rem]) ->
%% same
[New | Rem];
fun ({_, _, FILE}, [{_, _, FILE} | _] = Acc) ->
Acc;
(Seg, Acc) ->
[Seg | Acc]
end, [], SegRefs0)),
Expand Down
68 changes: 68 additions & 0 deletions test/ra_log_segment_writer_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ all_tests() ->
accept_mem_tables_for_down_server,
accept_mem_tables_with_deleted_server,
accept_mem_tables_with_corrupt_segment,
accept_mem_tables_multiple_ranges,
accept_mem_tables_multiple_ranges_snapshot,
truncate_segments,
truncate_segments_with_pending_update,
truncate_segments_with_pending_overwrite,
Expand Down Expand Up @@ -259,6 +261,7 @@ accept_mem_tables_multi_segment(Config) ->
end,
ok = gen_server:stop(Pid),
ok.

accept_mem_tables_multi_segment_overwrite(Config) ->
Dir = ?config(wal_dir, Config),
UId = ?config(uid, Config),
Expand Down Expand Up @@ -462,6 +465,71 @@ accept_mem_tables_with_corrupt_segment(Config) ->
ok = gen_server:stop(TblWriterPid),
ok.

accept_mem_tables_multiple_ranges(Config)->
Dir = ?config(wal_dir, Config),
SegConf = #{max_count => 16},
{ok, TblWriterPid} = ra_log_segment_writer:start_link(#{system => default,
name => ?SEGWR,
data_dir => Dir,
segment_conf => SegConf}),
UId = ?config(uid, Config),
Entries = [{N, 42, N} || N <- lists:seq(1, 32)],
Mt = make_mem_table(UId, Entries),
Entries2 = [{N, 42, N} || N <- lists:seq(33, 64)],
Mt2 = make_mem_table(UId, Entries2),
Ranges = #{UId => [
{ra_mt:tid(Mt2), ra_mt:range(Mt2)},
{ra_mt:tid(Mt), ra_mt:range(Mt)}
]},
ok = ra_log_segment_writer:accept_mem_tables(?SEGWR, Ranges,
make_wal(Config, "w1.wal")),
receive
{ra_log_event, {segments, _TidRanges, SegRefs}} ->
?assertMatch([
{49, 64, _},
{33, 48, _},
{17, 32, _},
{1, 16, _}
], SegRefs),
ok
after 3000 ->
flush(),
throw(ra_log_event_timeout)
end,
ok = gen_server:stop(TblWriterPid),
ok.

accept_mem_tables_multiple_ranges_snapshot(Config)->
Dir = ?config(wal_dir, Config),
SegConf = #{max_count => 16},
{ok, TblWriterPid} = ra_log_segment_writer:start_link(#{system => default,
name => ?SEGWR,
data_dir => Dir,
segment_conf => SegConf}),
UId = ?config(uid, Config),
Entries = [{N, 42, N} || N <- lists:seq(1, 32)],
Mt = make_mem_table(UId, Entries),
Entries2 = [{N, 42, N} || N <- lists:seq(33, 64)],
Mt2 = make_mem_table(UId, Entries2),
Ranges = #{UId => [
{ra_mt:tid(Mt2), ra_mt:range(Mt2)},
{ra_mt:tid(Mt), ra_mt:range(Mt)}
]},
ets:insert(ra_log_snapshot_state, {UId, 64}),
ok = ra_log_segment_writer:accept_mem_tables(?SEGWR, Ranges,
make_wal(Config, "w1.wal")),

receive
{ra_log_event, {segments, _TidRanges, SegRefs}} ->
?assertMatch([], SegRefs),
ok
after 3000 ->
flush(),
throw(ra_log_event_timeout)
end,
ok = gen_server:stop(TblWriterPid),
ok.

truncate_segments(Config) ->
Dir = ?config(wal_dir, Config),
SegConf = #{max_count => 12},
Expand Down

0 comments on commit da18d26

Please sign in to comment.