diff --git a/src/ra_log.erl b/src/ra_log.erl index ab6406e7..8a490344 100644 --- a/src/ra_log.erl +++ b/src/ra_log.erl @@ -683,8 +683,8 @@ handle_event({segments, TidRanges, NewSegs}, readers = Readers } = State0) -> Reader = ra_log_reader:update_segments(NewSegs, Reader0), - %% the tid ranges arrive in the order they were written so we need to - %% foldr here to process the oldest first + %% the tid ranges arrive in the reverse order they were written + %% (new -> old) so we need to foldr here to process the oldest first Mt = lists:foldr( fun ({Tid, Range}, Acc0) -> {Spec, Acc} = ra_mt:record_flushed(Tid, Range, Acc0), diff --git a/src/ra_log_segment_writer.erl b/src/ra_log_segment_writer.erl index 1e618bc3..e73a7bfe 100644 --- a/src/ra_log_segment_writer.erl +++ b/src/ra_log_segment_writer.erl @@ -158,8 +158,8 @@ handle_cast({mem_tables, Ranges, WalFile}, #state{data_dir = Dir, T1 = erlang:monotonic_time(), _ = [begin {_, Failures} = ra_lib:partition_parallel( - fun (E) -> - ok = flush_mem_table_ranges(E, State), + fun (TidRange) -> + ok = flush_mem_table_ranges(TidRange, State), true end, Tabs, infinity), case Failures of @@ -253,34 +253,46 @@ get_overview(#state{data_dir = Dir, flush_mem_table_ranges({ServerUId, TidRanges0}, #state{system = System} = State) -> SnapIdx = snap_idx(ServerUId), + %% TidRanges arrive here sorted new -> old. %% truncate and limit all ranges to create a contiguous non-overlapping %% list of tid ranges to flush to disk - TidRanges = - lists:foldl(fun ({T, Range}, []) -> - [{T, ra_range:truncate(SnapIdx, Range)}]; - ({T, Range0}, [{_T, {Start, _}} | _] = Acc) -> - Range1 = ra_range:truncate(SnapIdx, Range0), - case ra_range:limit(Start, Range1) of - undefined -> - Acc; - Range -> - [{T, Range} | Acc] - end - end, [], TidRanges0), + %% now TidRanges are sorted old -> new, i.e the correct order of + %% processing + TidRanges = lists:foldl( + fun ({T, Range0}, []) -> + case ra_range:truncate(SnapIdx, Range0) of + undefined -> + []; + Range -> + [{T, Range}] + end; + ({T, Range0}, [{_T, {Start, _}} | _] = Acc) -> + Range1 = ra_range:truncate(SnapIdx, Range0), + case ra_range:limit(Start, Range1) of + undefined -> + Acc; + Range -> + [{T, Range} | Acc] + end + end, [], TidRanges0), SegRefs0 = lists:append( - [flush_mem_table_range(ServerUId, TidRange, State) - || {_Tid, Range} = TidRange <- TidRanges, - Range =/= undefined]), + lists:reverse( + %% segrefs are returned in appended order so new -> old + %% so we need to reverse them so that the final appended list + %% of segrefs is in the old -> new order + [flush_mem_table_range(ServerUId, TidRange, State) + || TidRange <- TidRanges])), %% compact cases where a segment was appended in a subsequent call to %% flush_mem_table_range + %% the list of segrefs is returned in new -> old order which is the same + %% order they are kept by the ra_log SegRefs = lists:reverse( lists:foldl( - fun ({_, _, FILE} = New, [{_, _, FILE} | Rem]) -> - %% same - [New | Rem]; + fun ({_, _, FILE}, [{_, _, FILE} | _] = Acc) -> + Acc; (Seg, Acc) -> [Seg | Acc] end, [], SegRefs0)), @@ -350,9 +362,7 @@ send_segments(System, ServerUId, TidRanges, SegRefs) -> [ServerUId, "No Pid"]), %% delete from the memtable on the non-running server's behalf [begin - %% this looks a bit weird but - %% we dont need full init to run a delete - _ = ra_mt:delete({range, Tid, Range}) + _ = catch ra_mt:delete({range, Tid, Range}) end || {Tid, Range} <- TidRanges], ok; Pid -> @@ -360,16 +370,6 @@ send_segments(System, ServerUId, TidRanges, SegRefs) -> ok end. -clean_closed_mem_tables(System, UId, Tid) -> - {ok, ClosedTbl} = ra_system:lookup_name(System, closed_mem_tbls), - Tables = ets:lookup(ClosedTbl, UId), - [begin - ?DEBUG("~w: cleaning closed table for '~ts' range: ~b-~b", - [?MODULE, UId, From, To]), - %% delete the entry in the closed table lookup - true = ets:delete_object(ClosedTbl, O) - end || {_, _, From, To, T} = O <- Tables, T == Tid]. - append_to_segment(UId, Tid, StartIdx0, EndIdx, Seg, State) -> StartIdx = start_index(UId, StartIdx0), % EndIdx + 1 because FP @@ -379,13 +379,13 @@ append_to_segment(_, _, StartIdx, EndIdx, Seg, Closed, _State) when StartIdx >= EndIdx -> {Seg, Closed}; append_to_segment(UId, Tid, Idx, EndIdx, Seg0, Closed, State) -> - case ets:lookup(Tid, Idx) of + try ets:lookup(Tid, Idx) of [] -> StartIdx = start_index(UId, Idx), case Idx < StartIdx of true -> - %% TODO: we could combine registered check with this - %% although registered check could be slow + %% a snapshot must have been completed after we last checked + %% the start idx, continue flush from new start index. append_to_segment(UId, Tid, StartIdx, EndIdx, Seg0, Closed, State); false -> @@ -395,11 +395,11 @@ append_to_segment(UId, Tid, Idx, EndIdx, Seg0, Closed, State) -> [Idx, UId, StartIdx]), case ra_directory:is_registered_uid(State#state.system, UId) of true -> - ?ERROR("segment_writer: uid ~s is registered, exiting...", + ?ERROR("segment_writer: uid ~ts is registered, exiting...", [UId]), exit({missing_index, UId, Idx}); false -> - ?INFO("segment_writer: UId ~s was not registered, skipping", + ?INFO("segment_writer: uid ~ts was not registered, skipping", [UId]), undefined end @@ -422,8 +422,6 @@ append_to_segment(UId, Tid, Idx, EndIdx, Seg0, Closed, State) -> undefined -> %% a successor cannot be opened - this is most likely due %% to the directory having been deleted. - %% clear close mem tables here - _ = clean_closed_mem_tables(State#state.system, UId, Tid), undefined; Seg -> ok = counters:add(State#state.counter, ?C_SEGMENTS, 1), @@ -437,6 +435,16 @@ append_to_segment(UId, Tid, Idx, EndIdx, Seg0, Closed, State) -> FileName = ra_log_segment:filename(Seg0), exit({segment_writer_append_error, FileName, Posix}) end + catch _:badarg -> + ?ERROR("segment_writer: uid ~s ets table deleted", [UId]), + %% ets table has been deleted. + %% this could be due to two reasons + %% 1. the ra server has been deleted. + %% 2. an old mem table has been deleted due to snapshotting + %% but the member is still active + %% skipping this table + undefined + end. find_segment_files(Dir) -> diff --git a/src/ra_server_proc.erl b/src/ra_server_proc.erl index 9f3113f6..8f4731e0 100644 --- a/src/ra_server_proc.erl +++ b/src/ra_server_proc.erl @@ -1761,7 +1761,9 @@ gen_statem_safe_call(ServerId, Msg, Timeout) -> exit:{{nodedown, _}, _} -> {error, nodedown}; exit:{shutdown, _} -> - {error, shutdown} + {error, shutdown}; + exit:{Reason, _} -> + {error, Reason} end. do_state_query(QueryName, #state{server_state = State}) -> diff --git a/test/ra_log_2_SUITE.erl b/test/ra_log_2_SUITE.erl index 1a4a8784..d552924b 100644 --- a/test/ra_log_2_SUITE.erl +++ b/test/ra_log_2_SUITE.erl @@ -23,6 +23,7 @@ all_tests() -> handle_overwrite, handle_overwrite_append, receive_segment, + delete_during_segment_flush, read_one, take_after_overwrite_and_init, validate_sequential_fold, @@ -212,6 +213,42 @@ receive_segment(Config) -> ra_log:close(FinalLog), ok. +delete_during_segment_flush(Config) -> + %% this test doesn't necessarily trigger the potential issue but is + %% worth keeping around + Log0 = ra_log_init(Config), + Data = crypto:strong_rand_bytes(4000), + % write a few entries + Entries = [{I, 1, Data} || I <- lists:seq(1, 100000)], + + {PreWritten, _} = ra_log:last_written(Log0), + Log1 = lists:foldl(fun(E, Acc0) -> + ra_log:append(E, Acc0) + end, Log0, Entries), + Log2 = deliver_log_events_cond( + Log1, fun (L) -> + {PostWritten, _} = ra_log:last_written(L), + PostWritten >= (PreWritten + 10000) + end, 100), + Ref = monitor(process, ra_log_segment_writer), + % force wal roll over + ok = ra_log_wal:force_roll_over(ra_log_wal), + + timer:sleep(0), + ra_log:delete_everything(Log2), + + + receive + {'DOWN', Ref, _, _, _} -> + flush(), + ct:fail("segment writer unexpectedly exited") + after 100 -> + ok + end, + flush(), + + ok. + read_one(Config) -> ra_counters:new(?FUNCTION_NAME, ?RA_COUNTER_FIELDS), Log0 = ra_log_init(Config, #{counter => ra_counters:fetch(?FUNCTION_NAME)}), diff --git a/test/ra_log_segment_writer_SUITE.erl b/test/ra_log_segment_writer_SUITE.erl index 25d4e1c6..6ae04fab 100644 --- a/test/ra_log_segment_writer_SUITE.erl +++ b/test/ra_log_segment_writer_SUITE.erl @@ -32,6 +32,8 @@ all_tests() -> accept_mem_tables_for_down_server, accept_mem_tables_with_deleted_server, accept_mem_tables_with_corrupt_segment, + accept_mem_tables_multiple_ranges, + accept_mem_tables_multiple_ranges_snapshot, truncate_segments, truncate_segments_with_pending_update, truncate_segments_with_pending_overwrite, @@ -259,6 +261,7 @@ accept_mem_tables_multi_segment(Config) -> end, ok = gen_server:stop(Pid), ok. + accept_mem_tables_multi_segment_overwrite(Config) -> Dir = ?config(wal_dir, Config), UId = ?config(uid, Config), @@ -462,6 +465,71 @@ accept_mem_tables_with_corrupt_segment(Config) -> ok = gen_server:stop(TblWriterPid), ok. +accept_mem_tables_multiple_ranges(Config)-> + Dir = ?config(wal_dir, Config), + SegConf = #{max_count => 16}, + {ok, TblWriterPid} = ra_log_segment_writer:start_link(#{system => default, + name => ?SEGWR, + data_dir => Dir, + segment_conf => SegConf}), + UId = ?config(uid, Config), + Entries = [{N, 42, N} || N <- lists:seq(1, 32)], + Mt = make_mem_table(UId, Entries), + Entries2 = [{N, 42, N} || N <- lists:seq(33, 64)], + Mt2 = make_mem_table(UId, Entries2), + Ranges = #{UId => [ + {ra_mt:tid(Mt2), ra_mt:range(Mt2)}, + {ra_mt:tid(Mt), ra_mt:range(Mt)} + ]}, + ok = ra_log_segment_writer:accept_mem_tables(?SEGWR, Ranges, + make_wal(Config, "w1.wal")), + receive + {ra_log_event, {segments, _TidRanges, SegRefs}} -> + ?assertMatch([ + {49, 64, _}, + {33, 48, _}, + {17, 32, _}, + {1, 16, _} + ], SegRefs), + ok + after 3000 -> + flush(), + throw(ra_log_event_timeout) + end, + ok = gen_server:stop(TblWriterPid), + ok. + +accept_mem_tables_multiple_ranges_snapshot(Config)-> + Dir = ?config(wal_dir, Config), + SegConf = #{max_count => 16}, + {ok, TblWriterPid} = ra_log_segment_writer:start_link(#{system => default, + name => ?SEGWR, + data_dir => Dir, + segment_conf => SegConf}), + UId = ?config(uid, Config), + Entries = [{N, 42, N} || N <- lists:seq(1, 32)], + Mt = make_mem_table(UId, Entries), + Entries2 = [{N, 42, N} || N <- lists:seq(33, 64)], + Mt2 = make_mem_table(UId, Entries2), + Ranges = #{UId => [ + {ra_mt:tid(Mt2), ra_mt:range(Mt2)}, + {ra_mt:tid(Mt), ra_mt:range(Mt)} + ]}, + ets:insert(ra_log_snapshot_state, {UId, 64}), + ok = ra_log_segment_writer:accept_mem_tables(?SEGWR, Ranges, + make_wal(Config, "w1.wal")), + + receive + {ra_log_event, {segments, _TidRanges, SegRefs}} -> + ?assertMatch([], SegRefs), + ok + after 3000 -> + flush(), + throw(ra_log_event_timeout) + end, + ok = gen_server:stop(TblWriterPid), + ok. + truncate_segments(Config) -> Dir = ?config(wal_dir, Config), SegConf = #{max_count => 12},