diff --git a/src/ra_log.erl b/src/ra_log.erl index ab6406e7..8a490344 100644 --- a/src/ra_log.erl +++ b/src/ra_log.erl @@ -683,8 +683,8 @@ handle_event({segments, TidRanges, NewSegs}, readers = Readers } = State0) -> Reader = ra_log_reader:update_segments(NewSegs, Reader0), - %% the tid ranges arrive in the order they were written so we need to - %% foldr here to process the oldest first + %% the tid ranges arrive in the reverse order they were written + %% (new -> old) so we need to foldr here to process the oldest first Mt = lists:foldr( fun ({Tid, Range}, Acc0) -> {Spec, Acc} = ra_mt:record_flushed(Tid, Range, Acc0), diff --git a/src/ra_log_segment_writer.erl b/src/ra_log_segment_writer.erl index e01e2b4f..e73a7bfe 100644 --- a/src/ra_log_segment_writer.erl +++ b/src/ra_log_segment_writer.erl @@ -158,8 +158,8 @@ handle_cast({mem_tables, Ranges, WalFile}, #state{data_dir = Dir, T1 = erlang:monotonic_time(), _ = [begin {_, Failures} = ra_lib:partition_parallel( - fun (E) -> - ok = flush_mem_table_ranges(E, State), + fun (TidRange) -> + ok = flush_mem_table_ranges(TidRange, State), true end, Tabs, infinity), case Failures of @@ -253,34 +253,46 @@ get_overview(#state{data_dir = Dir, flush_mem_table_ranges({ServerUId, TidRanges0}, #state{system = System} = State) -> SnapIdx = snap_idx(ServerUId), + %% TidRanges arrive here sorted new -> old. %% truncate and limit all ranges to create a contiguous non-overlapping %% list of tid ranges to flush to disk - TidRanges = - lists:foldl(fun ({T, Range}, []) -> - [{T, ra_range:truncate(SnapIdx, Range)}]; - ({T, Range0}, [{_T, {Start, _}} | _] = Acc) -> - Range1 = ra_range:truncate(SnapIdx, Range0), - case ra_range:limit(Start, Range1) of - undefined -> - Acc; - Range -> - [{T, Range} | Acc] - end - end, [], TidRanges0), + %% now TidRanges are sorted old -> new, i.e the correct order of + %% processing + TidRanges = lists:foldl( + fun ({T, Range0}, []) -> + case ra_range:truncate(SnapIdx, Range0) of + undefined -> + []; + Range -> + [{T, Range}] + end; + ({T, Range0}, [{_T, {Start, _}} | _] = Acc) -> + Range1 = ra_range:truncate(SnapIdx, Range0), + case ra_range:limit(Start, Range1) of + undefined -> + Acc; + Range -> + [{T, Range} | Acc] + end + end, [], TidRanges0), SegRefs0 = lists:append( - [flush_mem_table_range(ServerUId, TidRange, State) - || {_Tid, Range} = TidRange <- TidRanges, - Range =/= undefined]), + lists:reverse( + %% segrefs are returned in appended order so new -> old + %% so we need to reverse them so that the final appended list + %% of segrefs is in the old -> new order + [flush_mem_table_range(ServerUId, TidRange, State) + || TidRange <- TidRanges])), %% compact cases where a segment was appended in a subsequent call to %% flush_mem_table_range + %% the list of segrefs is returned in new -> old order which is the same + %% order they are kept by the ra_log SegRefs = lists:reverse( lists:foldl( - fun ({_, _, FILE} = New, [{_, _, FILE} | Rem]) -> - %% same - [New | Rem]; + fun ({_, _, FILE}, [{_, _, FILE} | _] = Acc) -> + Acc; (Seg, Acc) -> [Seg | Acc] end, [], SegRefs0)), @@ -372,8 +384,8 @@ append_to_segment(UId, Tid, Idx, EndIdx, Seg0, Closed, State) -> StartIdx = start_index(UId, Idx), case Idx < StartIdx of true -> - %% TODO: we could combine registered check with this - %% although registered check could be slow + %% a snapshot must have been completed after we last checked + %% the start idx, continue flush from new start index. append_to_segment(UId, Tid, StartIdx, EndIdx, Seg0, Closed, State); false -> @@ -383,11 +395,11 @@ append_to_segment(UId, Tid, Idx, EndIdx, Seg0, Closed, State) -> [Idx, UId, StartIdx]), case ra_directory:is_registered_uid(State#state.system, UId) of true -> - ?ERROR("segment_writer: uid ~s is registered, exiting...", + ?ERROR("segment_writer: uid ~ts is registered, exiting...", [UId]), exit({missing_index, UId, Idx}); false -> - ?INFO("segment_writer: UId ~s was not registered, skipping", + ?INFO("segment_writer: uid ~ts was not registered, skipping", [UId]), undefined end diff --git a/test/ra_log_2_SUITE.erl b/test/ra_log_2_SUITE.erl index 44d371c3..d552924b 100644 --- a/test/ra_log_2_SUITE.erl +++ b/test/ra_log_2_SUITE.erl @@ -214,6 +214,8 @@ receive_segment(Config) -> ok. delete_during_segment_flush(Config) -> + %% this test doesn't necessarily trigger the potential issue but is + %% worth keeping around Log0 = ra_log_init(Config), Data = crypto:strong_rand_bytes(4000), % write a few entries @@ -228,14 +230,21 @@ delete_during_segment_flush(Config) -> {PostWritten, _} = ra_log:last_written(L), PostWritten >= (PreWritten + 10000) end, 100), - _Ref = monitor(process, ra_log_segment_writer), + Ref = monitor(process, ra_log_segment_writer), % force wal roll over ok = ra_log_wal:force_roll_over(ra_log_wal), + timer:sleep(0), ra_log:delete_everything(Log2), - timer:sleep(1000), + receive + {'DOWN', Ref, _, _, _} -> + flush(), + ct:fail("segment writer unexpectedly exited") + after 100 -> + ok + end, flush(), ok. diff --git a/test/ra_log_segment_writer_SUITE.erl b/test/ra_log_segment_writer_SUITE.erl index 25d4e1c6..6ae04fab 100644 --- a/test/ra_log_segment_writer_SUITE.erl +++ b/test/ra_log_segment_writer_SUITE.erl @@ -32,6 +32,8 @@ all_tests() -> accept_mem_tables_for_down_server, accept_mem_tables_with_deleted_server, accept_mem_tables_with_corrupt_segment, + accept_mem_tables_multiple_ranges, + accept_mem_tables_multiple_ranges_snapshot, truncate_segments, truncate_segments_with_pending_update, truncate_segments_with_pending_overwrite, @@ -259,6 +261,7 @@ accept_mem_tables_multi_segment(Config) -> end, ok = gen_server:stop(Pid), ok. + accept_mem_tables_multi_segment_overwrite(Config) -> Dir = ?config(wal_dir, Config), UId = ?config(uid, Config), @@ -462,6 +465,71 @@ accept_mem_tables_with_corrupt_segment(Config) -> ok = gen_server:stop(TblWriterPid), ok. +accept_mem_tables_multiple_ranges(Config)-> + Dir = ?config(wal_dir, Config), + SegConf = #{max_count => 16}, + {ok, TblWriterPid} = ra_log_segment_writer:start_link(#{system => default, + name => ?SEGWR, + data_dir => Dir, + segment_conf => SegConf}), + UId = ?config(uid, Config), + Entries = [{N, 42, N} || N <- lists:seq(1, 32)], + Mt = make_mem_table(UId, Entries), + Entries2 = [{N, 42, N} || N <- lists:seq(33, 64)], + Mt2 = make_mem_table(UId, Entries2), + Ranges = #{UId => [ + {ra_mt:tid(Mt2), ra_mt:range(Mt2)}, + {ra_mt:tid(Mt), ra_mt:range(Mt)} + ]}, + ok = ra_log_segment_writer:accept_mem_tables(?SEGWR, Ranges, + make_wal(Config, "w1.wal")), + receive + {ra_log_event, {segments, _TidRanges, SegRefs}} -> + ?assertMatch([ + {49, 64, _}, + {33, 48, _}, + {17, 32, _}, + {1, 16, _} + ], SegRefs), + ok + after 3000 -> + flush(), + throw(ra_log_event_timeout) + end, + ok = gen_server:stop(TblWriterPid), + ok. + +accept_mem_tables_multiple_ranges_snapshot(Config)-> + Dir = ?config(wal_dir, Config), + SegConf = #{max_count => 16}, + {ok, TblWriterPid} = ra_log_segment_writer:start_link(#{system => default, + name => ?SEGWR, + data_dir => Dir, + segment_conf => SegConf}), + UId = ?config(uid, Config), + Entries = [{N, 42, N} || N <- lists:seq(1, 32)], + Mt = make_mem_table(UId, Entries), + Entries2 = [{N, 42, N} || N <- lists:seq(33, 64)], + Mt2 = make_mem_table(UId, Entries2), + Ranges = #{UId => [ + {ra_mt:tid(Mt2), ra_mt:range(Mt2)}, + {ra_mt:tid(Mt), ra_mt:range(Mt)} + ]}, + ets:insert(ra_log_snapshot_state, {UId, 64}), + ok = ra_log_segment_writer:accept_mem_tables(?SEGWR, Ranges, + make_wal(Config, "w1.wal")), + + receive + {ra_log_event, {segments, _TidRanges, SegRefs}} -> + ?assertMatch([], SegRefs), + ok + after 3000 -> + flush(), + throw(ra_log_event_timeout) + end, + ok = gen_server:stop(TblWriterPid), + ok. + truncate_segments(Config) -> Dir = ?config(wal_dir, Config), SegConf = #{max_count => 12},