From da18d26225a6b72dcf38e4b3270b04d3e1b3805d Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Fri, 29 Nov 2024 07:55:20 +0000 Subject: [PATCH] segment writer bug fix --- src/ra_log_segment_writer.erl | 41 +++++++++-------- test/ra_log_segment_writer_SUITE.erl | 68 ++++++++++++++++++++++++++++ 2 files changed, 91 insertions(+), 18 deletions(-) diff --git a/src/ra_log_segment_writer.erl b/src/ra_log_segment_writer.erl index e01e2b4f..2729a8f1 100644 --- a/src/ra_log_segment_writer.erl +++ b/src/ra_log_segment_writer.erl @@ -256,31 +256,36 @@ flush_mem_table_ranges({ServerUId, TidRanges0}, %% truncate and limit all ranges to create a contiguous non-overlapping %% list of tid ranges to flush to disk - TidRanges = - lists:foldl(fun ({T, Range}, []) -> - [{T, ra_range:truncate(SnapIdx, Range)}]; - ({T, Range0}, [{_T, {Start, _}} | _] = Acc) -> - Range1 = ra_range:truncate(SnapIdx, Range0), - case ra_range:limit(Start, Range1) of - undefined -> - Acc; - Range -> - [{T, Range} | Acc] - end - end, [], TidRanges0), + TidRanges = lists:foldl( + fun ({T, Range0}, []) -> + case ra_range:truncate(SnapIdx, Range0) of + undefined -> + []; + Range -> + [{T, Range}] + end; + ({T, Range0}, [{_T, {Start, _}} | _] = Acc) -> + Range1 = ra_range:truncate(SnapIdx, Range0), + case ra_range:limit(Start, Range1) of + undefined -> + Acc; + Range -> + [{T, Range} | Acc] + end + end, [], TidRanges0), SegRefs0 = lists:append( - [flush_mem_table_range(ServerUId, TidRange, State) - || {_Tid, Range} = TidRange <- TidRanges, - Range =/= undefined]), + lists:reverse( + [flush_mem_table_range(ServerUId, TidRange, State) + || {_Tid, Range} = TidRange <- TidRanges, + Range =/= undefined])), %% compact cases where a segment was appended in a subsequent call to %% flush_mem_table_range SegRefs = lists:reverse( lists:foldl( - fun ({_, _, FILE} = New, [{_, _, FILE} | Rem]) -> - %% same - [New | Rem]; + fun ({_, _, FILE}, [{_, _, FILE} | _] = Acc) -> + Acc; (Seg, Acc) -> [Seg | Acc] end, [], SegRefs0)), diff --git a/test/ra_log_segment_writer_SUITE.erl b/test/ra_log_segment_writer_SUITE.erl index 25d4e1c6..6ae04fab 100644 --- a/test/ra_log_segment_writer_SUITE.erl +++ b/test/ra_log_segment_writer_SUITE.erl @@ -32,6 +32,8 @@ all_tests() -> accept_mem_tables_for_down_server, accept_mem_tables_with_deleted_server, accept_mem_tables_with_corrupt_segment, + accept_mem_tables_multiple_ranges, + accept_mem_tables_multiple_ranges_snapshot, truncate_segments, truncate_segments_with_pending_update, truncate_segments_with_pending_overwrite, @@ -259,6 +261,7 @@ accept_mem_tables_multi_segment(Config) -> end, ok = gen_server:stop(Pid), ok. + accept_mem_tables_multi_segment_overwrite(Config) -> Dir = ?config(wal_dir, Config), UId = ?config(uid, Config), @@ -462,6 +465,71 @@ accept_mem_tables_with_corrupt_segment(Config) -> ok = gen_server:stop(TblWriterPid), ok. +accept_mem_tables_multiple_ranges(Config)-> + Dir = ?config(wal_dir, Config), + SegConf = #{max_count => 16}, + {ok, TblWriterPid} = ra_log_segment_writer:start_link(#{system => default, + name => ?SEGWR, + data_dir => Dir, + segment_conf => SegConf}), + UId = ?config(uid, Config), + Entries = [{N, 42, N} || N <- lists:seq(1, 32)], + Mt = make_mem_table(UId, Entries), + Entries2 = [{N, 42, N} || N <- lists:seq(33, 64)], + Mt2 = make_mem_table(UId, Entries2), + Ranges = #{UId => [ + {ra_mt:tid(Mt2), ra_mt:range(Mt2)}, + {ra_mt:tid(Mt), ra_mt:range(Mt)} + ]}, + ok = ra_log_segment_writer:accept_mem_tables(?SEGWR, Ranges, + make_wal(Config, "w1.wal")), + receive + {ra_log_event, {segments, _TidRanges, SegRefs}} -> + ?assertMatch([ + {49, 64, _}, + {33, 48, _}, + {17, 32, _}, + {1, 16, _} + ], SegRefs), + ok + after 3000 -> + flush(), + throw(ra_log_event_timeout) + end, + ok = gen_server:stop(TblWriterPid), + ok. + +accept_mem_tables_multiple_ranges_snapshot(Config)-> + Dir = ?config(wal_dir, Config), + SegConf = #{max_count => 16}, + {ok, TblWriterPid} = ra_log_segment_writer:start_link(#{system => default, + name => ?SEGWR, + data_dir => Dir, + segment_conf => SegConf}), + UId = ?config(uid, Config), + Entries = [{N, 42, N} || N <- lists:seq(1, 32)], + Mt = make_mem_table(UId, Entries), + Entries2 = [{N, 42, N} || N <- lists:seq(33, 64)], + Mt2 = make_mem_table(UId, Entries2), + Ranges = #{UId => [ + {ra_mt:tid(Mt2), ra_mt:range(Mt2)}, + {ra_mt:tid(Mt), ra_mt:range(Mt)} + ]}, + ets:insert(ra_log_snapshot_state, {UId, 64}), + ok = ra_log_segment_writer:accept_mem_tables(?SEGWR, Ranges, + make_wal(Config, "w1.wal")), + + receive + {ra_log_event, {segments, _TidRanges, SegRefs}} -> + ?assertMatch([], SegRefs), + ok + after 3000 -> + flush(), + throw(ra_log_event_timeout) + end, + ok = gen_server:stop(TblWriterPid), + ok. + truncate_segments(Config) -> Dir = ?config(wal_dir, Config), SegConf = #{max_count => 12},