diff --git a/src/ra_log.erl b/src/ra_log.erl index d5faebdb..97f4e3ce 100644 --- a/src/ra_log.erl +++ b/src/ra_log.erl @@ -133,11 +133,13 @@ -type overview() :: #{type := ra_log, last_index := ra_index(), + last_term := ra_term(), first_index := ra_index(), last_written_index_term := ra_idxterm(), num_segments := non_neg_integer(), open_segments => non_neg_integer(), snapshot_index => undefined | ra_index(), + snapshot_term => undefined | ra_index(), mem_table_size => non_neg_integer(), latest_checkpoint_index => undefined | ra_index(), atom() => term()}. @@ -213,10 +215,8 @@ init(#{uid := UId, {DeleteSpecs, Mt} = ra_mt:set_first(FirstIdx, Mt0), ok = exec_mem_table_delete(Names, UId, DeleteSpecs), - Reader0 = ra_log_reader:init(UId, Dir, FirstIdx, MaxOpen, AccessPattern, SegRefs, - Names, Counter), - %% TODO: can there be obsolete segments returned here? - {Reader, []} = ra_log_reader:update_first_index(FirstIdx, Reader0), + Reader = ra_log_reader:init(UId, Dir, MaxOpen, AccessPattern, SegRefs, + Names, Counter), %% assert there is no gap between the snapshot %% and the first index in the log case (FirstIdx - SnapIdx) > 1 of @@ -237,13 +237,13 @@ init(#{uid := UId, counter = Counter, names = Names}, State0 = #?MODULE{cfg = Cfg, - first_index = max(SnapIdx + 1, FirstIdx), - last_index = max(SnapIdx, LastIdx0), - reader = Reader, - mem_table = Mt, - snapshot_state = SnapshotState, - last_wal_write = {whereis(Wal), now_ms()} - }, + first_index = max(SnapIdx + 1, FirstIdx), + last_index = max(SnapIdx, LastIdx0), + reader = Reader, + mem_table = Mt, + snapshot_state = SnapshotState, + last_wal_write = {whereis(Wal), now_ms()} + }, put_counter(Cfg, ?C_RA_SVR_METRIC_SNAPSHOT_INDEX, SnapIdx), LastIdx = State0#?MODULE.last_index, put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, LastIdx), @@ -273,7 +273,7 @@ init(#{uid := UId, LastWrittenIdx = case ra_log_wal:last_writer_seq(Wal, UId) of {ok, undefined} -> %% take last segref index - LastSegRefIdx; + max(SnapIdx, LastSegRefIdx); {ok, Idx} -> max(Idx, LastSegRefIdx); {error, wal_down} -> @@ -281,7 +281,12 @@ init(#{uid := UId, [State2#?MODULE.cfg#cfg.log_id]), exit(wal_down) end, - {LastWrittenTerm, State3} = fetch_term(LastWrittenIdx, State2), + {LastWrittenTerm, State3} = case LastWrittenIdx of + SnapIdx -> + {SnapTerm, State2}; + _ -> + fetch_term(LastWrittenIdx, State2) + end, LastTerm = ra_lib:default(LastTerm0, -1), State4 = State3#?MODULE{last_term = LastTerm, @@ -292,10 +297,13 @@ init(#{uid := UId, % and an empty meta data map State = maybe_append_first_entry(State4), ?DEBUG("~ts: ra_log:init recovered last_index_term ~w" - " first index ~b", + " snapshot_index_term ~w, last_written_index_term ~w", [State#?MODULE.cfg#cfg.log_id, last_index_term(State), - State#?MODULE.first_index]), + {SnapIdx, SnapTerm}, + State#?MODULE.last_written_index_term + ]), + ?DEBUG("~ts: ra_log:init overview ~p", [overview(State)]), element(1, delete_segments(SnapIdx, State)). -spec close(state()) -> ok. @@ -936,6 +944,7 @@ exists({Idx, Term}, Log0) -> -spec overview(state()) -> overview(). overview(#?MODULE{last_index = LastIndex, + last_term = LastTerm, first_index = FirstIndex, last_written_index_term = LWIT, snapshot_state = SnapshotState, @@ -943,16 +952,22 @@ overview(#?MODULE{last_index = LastIndex, last_wal_write = {_LastPid, LastMs}, mem_table = Mt }) -> + CurrSnap = ra_snapshot:current(SnapshotState), #{type => ?MODULE, last_index => LastIndex, + last_term => LastTerm, first_index => FirstIndex, last_written_index_term => LWIT, num_segments => length(ra_log_reader:segment_refs(Reader)), open_segments => ra_log_reader:num_open_segments(Reader), - snapshot_index => case ra_snapshot:current(SnapshotState) of + snapshot_index => case CurrSnap of undefined -> undefined; {I, _} -> I end, + snapshot_term => case CurrSnap of + undefined -> undefined; + {_, T} -> T + end, latest_checkpoint_index => case ra_snapshot:latest_checkpoint(SnapshotState) of undefined -> undefined; @@ -1026,14 +1041,13 @@ release_resources(MaxOpenSegments, directory = Dir, counter = Counter, names = Names}, - first_index = FstIdx, reader = Reader} = State) -> ActiveSegs = ra_log_reader:segment_refs(Reader), % close all open segments % deliberately ignoring return value _ = ra_log_reader:close(Reader), %% open a new segment with the new max open segment value - State#?MODULE{reader = ra_log_reader:init(UId, Dir, FstIdx, MaxOpenSegments, + State#?MODULE{reader = ra_log_reader:init(UId, Dir, MaxOpenSegments, AccessPattern, ActiveSegs, Names, Counter)}. @@ -1042,11 +1056,10 @@ release_resources(MaxOpenSegments, register_reader(Pid, #?MODULE{cfg = #cfg{uid = UId, directory = Dir, names = Names}, - first_index = Idx, reader = Reader, readers = Readers} = State) -> SegRefs = ra_log_reader:segment_refs(Reader), - NewReader = ra_log_reader:init(UId, Dir, Idx, 1, SegRefs, Names), + NewReader = ra_log_reader:init(UId, Dir, 1, SegRefs, Names), {State#?MODULE{readers = [Pid | Readers]}, [{reply, {ok, NewReader}}, {monitor, process, log, Pid}]}. diff --git a/src/ra_log_reader.erl b/src/ra_log_reader.erl index e6705bad..902c51bf 100644 --- a/src/ra_log_reader.erl +++ b/src/ra_log_reader.erl @@ -9,8 +9,8 @@ -compile(inline_list_funcs). -export([ - init/6, - init/8, + init/5, + init/7, close/1, update_segments/2, handle_log_update/2, @@ -37,7 +37,7 @@ -type segment_ref() :: {From :: ra_index(), To :: ra_index(), File :: string()}. -record(?STATE, {cfg :: #cfg{}, - first_index = 0 :: ra_index(), + % first_index = 0 :: ra_index(), last_index = 0 :: ra:index(), segment_refs = [] :: [segment_ref()], open_segments :: ra_flru:state() @@ -52,16 +52,16 @@ %% PUBLIC --spec init(ra_uid(), file:filename(), ra_index(), non_neg_integer(), +-spec init(ra_uid(), file:filename(), non_neg_integer(), [segment_ref()], ra_system:names()) -> state(). -init(UId, Dir, FirstIdx, MaxOpen, SegRefs, Names) -> - init(UId, Dir, FirstIdx, MaxOpen, random, SegRefs, Names, undefined). +init(UId, Dir, MaxOpen, SegRefs, Names) -> + init(UId, Dir, MaxOpen, random, SegRefs, Names, undefined). --spec init(ra_uid(), file:filename(), ra_index(), non_neg_integer(), +-spec init(ra_uid(), file:filename(), non_neg_integer(), access_pattern(), [segment_ref()], ra_system:names(), undefined | counters:counters_ref()) -> state(). -init(UId, Dir, FirstIdx, MaxOpen, AccessPattern, SegRefs, #{}, Counter) +init(UId, Dir, MaxOpen, AccessPattern, SegRefs, #{}, Counter) when is_binary(UId) -> Cfg = #cfg{uid = UId, counter = Counter, @@ -79,7 +79,7 @@ init(UId, Dir, FirstIdx, MaxOpen, AccessPattern, SegRefs, #{}, Counter) end, #?STATE{cfg = Cfg, open_segments = ra_flru:new(MaxOpen, FlruHandler), - first_index = FirstIdx, + % first_index = FirstIdx, last_index = LastIdx, segment_refs = SegRefs}. @@ -107,7 +107,7 @@ update_segments(NewSegmentRefs, -spec handle_log_update({ra_log_update, undefined | pid(), ra_index(), [segment_ref()]}, state()) -> state(). -handle_log_update({ra_log_update, From, FstIdx, SegRefs}, +handle_log_update({ra_log_update, From, _FstIdx, SegRefs}, #?STATE{open_segments = Open0} = State) -> Open = ra_flru:evict_all(Open0), case From of @@ -117,7 +117,7 @@ handle_log_update({ra_log_update, From, FstIdx, SegRefs}, From ! ra_log_update_processed end, State#?MODULE{segment_refs = SegRefs, - first_index = FstIdx, + % first_index = FstIdx, open_segments = Open}. -spec update_first_index(ra_index(), state()) -> @@ -140,7 +140,7 @@ update_first_index(FstIdx, #?STATE{segment_refs = SegRefs0, end end, OpenSegs0, ObsoleteKeys), {State#?STATE{open_segments = OpenSegs, - first_index = FstIdx, + % first_index = FstIdx, segment_refs = Active}, Obsolete} end. diff --git a/src/ra_log_wal.erl b/src/ra_log_wal.erl index b5b551ba..1048a503 100644 --- a/src/ra_log_wal.erl +++ b/src/ra_log_wal.erl @@ -824,7 +824,16 @@ recover_records(#conf{names = Names} = Conf, Fd, recover_records(Conf, Fd, Chunk, Cache, State) end; ok -> - recover_records(Conf, Fd, Rest, Cache, State0); + %% best the the snapshot index as the last + %% writer index + Writers = case State0#recovery.writers of + #{UId := {in_seq, SnapIdx}} = W -> + W; + W -> + W#{UId => {in_seq, SnapIdx}} + end, + recover_records(Conf, Fd, Rest, Cache, + State0#recovery{writers = Writers}); error -> ?DEBUG("WAL: record failed CRC check. If this is the last record" " recovery can resume", []), diff --git a/test/ra_log_2_SUITE.erl b/test/ra_log_2_SUITE.erl index 7f6dfa44..007cca2f 100644 --- a/test/ra_log_2_SUITE.erl +++ b/test/ra_log_2_SUITE.erl @@ -59,6 +59,7 @@ all_tests() -> sparse_read_out_of_range_2, written_event_after_snapshot, writes_lower_than_snapshot_index_are_dropped, + recover_after_snapshot, updated_segment_can_be_read, open_segments_limit, %% TODO mt: do or deprecate in current minor @@ -476,6 +477,26 @@ written_event_after_snapshot(Config) -> % false = filelib:is_file(Snap1), ok. + +recover_after_snapshot(Config) -> + Log0 = ra_log_init(Config, #{min_snapshot_interval => 1}), + Log1 = ra_log:append({1, 1, <<"one">>}, Log0), + Log2 = ra_log:append({2, 1, <<"two">>}, Log1), + {Log3, _} = ra_log:update_release_cursor(2, #{}, 1, + <<"one+two">>, Log2), + Log4 = deliver_all_log_events(Log3, 100), + ra_log:close(Log4), + restart_wal(), + timer:sleep(1000), + Log = ra_log_init(Config, #{min_snapshot_interval => 1}), + Overview = ra_log:overview(Log), + ra_log:close(Log), + ?assertMatch(#{last_index := 2, + last_term := 1, + snapshot_index := 2, + last_written_index_term := {2, 1}}, Overview), + ok. + writes_lower_than_snapshot_index_are_dropped(Config) -> logger:set_primary_config(level, debug), Log0 = ra_log_init(Config, #{min_snapshot_interval => 1}),