diff --git a/src/wa_raft_server.erl b/src/wa_raft_server.erl index a4ff6dd..0df8f11 100644 --- a/src/wa_raft_server.erl +++ b/src/wa_raft_server.erl @@ -759,39 +759,35 @@ stalled(_Type, ?REMOTE(Sender, ?APPEND_ENTRIES(PrevLogIndex, _PrevLogTerm, _Entr {keep_state, NewState}; stalled({call, From}, ?SNAPSHOT_AVAILABLE_COMMAND(Root, #raft_log_pos{index = SnapshotIndex, term = SnapshotTerm} = SnapshotPos), - #raft_state{name = Name, data_dir = DataDir, log_view = View0, storage = Storage, + #raft_state{name = Name, log_view = View0, storage = Storage, current_term = CurrentTerm, last_applied = LastApplied} = State0) -> case SnapshotIndex > LastApplied orelse LastApplied =:= 0 of true -> - Path = filename:join(DataDir, ?SNAPSHOT_NAME(SnapshotIndex, SnapshotTerm)), - catch filelib:ensure_dir(Path), - case prim_file:rename(Root, Path) of - ok -> - try - ?LOG_NOTICE("Server[~0p, term ~0p, stalled] applying snapshot ~p:~p", - [Name, CurrentTerm, SnapshotIndex, SnapshotTerm], #{domain => [whatsapp, wa_raft]}), - ok = wa_raft_storage:open_snapshot(Storage, SnapshotPos), - {ok, View1} = wa_raft_log:reset(View0, SnapshotPos), - State1 = State0#raft_state{log_view = View1, last_applied = SnapshotIndex, commit_index = SnapshotIndex}, - State2 = load_config(State1), - ?LOG_NOTICE("Server[~0p, term ~0p, stalled] switching to follower after installing snapshot at ~p:~p.", - [Name, CurrentTerm, SnapshotIndex, SnapshotTerm], #{domain => [whatsapp, wa_raft]}), - State3 = case SnapshotTerm > CurrentTerm of - true -> advance_term(?FUNCTION_NAME, SnapshotTerm, undefined, State2); - false -> State2 - end, - % At this point, we assume that we received some cluster membership configuration from - % our peer so it is safe to transition to an operational state. - {next_state, follower, State3, [{reply, From, ok}]} - after - % It is assumed that the loading of the snapshot will move the snapshot away or - % otherwise disassociate the storage state from the snapshot path. - catch file:del_dir_r(Path) - end; - {error, Reason} -> - ?LOG_WARNING("Server[~0p, term ~0p, stalled] failed to rename available snapshot ~p to ~p due to ~p", - [Name, CurrentTerm, Root, Path, Reason], #{domain => [whatsapp, wa_raft]}), + try + ?LOG_NOTICE("Server[~0p, term ~0p, stalled] applying snapshot ~p:~p", + [Name, CurrentTerm, SnapshotIndex, SnapshotTerm], #{domain => [whatsapp, wa_raft]}), + ok = wa_raft_storage:open_snapshot(Storage, Root, SnapshotPos), + {ok, View1} = wa_raft_log:reset(View0, SnapshotPos), + State1 = State0#raft_state{log_view = View1, last_applied = SnapshotIndex, commit_index = SnapshotIndex}, + State2 = load_config(State1), + ?LOG_NOTICE("Server[~0p, term ~0p, stalled] switching to follower after installing snapshot at ~p:~p.", + [Name, CurrentTerm, SnapshotIndex, SnapshotTerm], #{domain => [whatsapp, wa_raft]}), + State3 = case SnapshotTerm > CurrentTerm of + true -> advance_term(?FUNCTION_NAME, SnapshotTerm, undefined, State2); + false -> State2 + end, + % At this point, we assume that we received some cluster membership configuration from + % our peer so it is safe to transition to an operational state. + {next_state, follower, State3, [{reply, From, ok}]} + catch + _:Reason -> + ?LOG_WARNING("Server[~0p, term ~0p, stalled] failed to load available snapshot ~p due to ~p", + [Name, CurrentTerm, Root, Reason], #{domain => [whatsapp, wa_raft]}), {keep_state_and_data, {reply, From, {error, Reason}}} + after + % It is assumed that the loading of the snapshot will move the snapshot away or + % otherwise disassociate the storage state from the snapshot path. + catch file:del_dir_r(Root) end; false -> ?LOG_NOTICE("Server[~0p, term ~0p, stalled] ignoring available snapshot ~p:~p with index not past ours (~p)", diff --git a/src/wa_raft_storage.erl b/src/wa_raft_storage.erl index 9dc0335..47ac075 100644 --- a/src/wa_raft_storage.erl +++ b/src/wa_raft_storage.erl @@ -28,7 +28,7 @@ %% API -export([ open/1, - open_snapshot/2, + open_snapshot/3, create_snapshot/1, create_snapshot/2, delete_snapshot/2 @@ -329,9 +329,9 @@ cancel(ServiceRef) -> open(ServiceRef) -> gen_server:call(ServiceRef, open, ?RAFT_RPC_CALL_TIMEOUT()). --spec open_snapshot(ServiceRef :: pid() | atom(), LastAppliedPos :: wa_raft_log:log_pos()) -> ok | error(). -open_snapshot(ServiceRef, LastAppliedPos) -> - gen_server:call(ServiceRef, {snapshot_open, LastAppliedPos}, ?RAFT_STORAGE_CALL_TIMEOUT()). +-spec open_snapshot(ServiceRef :: pid() | atom(), SnapshotPath :: file:filename(), LastAppliedPos :: wa_raft_log:log_pos()) -> ok | error(). +open_snapshot(ServiceRef, SnapshotPath, LastAppliedPos) -> + gen_server:call(ServiceRef, {snapshot_open, SnapshotPath, LastAppliedPos}, ?RAFT_STORAGE_CALL_TIMEOUT()). -spec create_snapshot(ServiceRef :: pid() | atom()) -> {ok, Pos :: wa_raft_log:log_pos()} | error(). create_snapshot(ServiceRef) -> @@ -426,7 +426,7 @@ init(#raft_options{application = App, table = Table, partition = Partition, data snapshot_create | status | {snapshot_create, Name :: string()} | - {snapshot_open, LastAppliedPos :: wa_raft_log:log_pos()} | + {snapshot_open, Path :: file:filename(), LastAppliedPos :: wa_raft_log:log_pos()} | {read_metadata, Key :: metadata()} | label. handle_call(open, _From, #state{last_applied = LastApplied} = State) -> @@ -448,9 +448,8 @@ handle_call({snapshot_create, Name}, _From, State) -> Result = create_snapshot_impl(Name, State), {reply, Result, State}; -handle_call({snapshot_open, #raft_log_pos{index = LastIndex, term = LastTerm} = LogPos}, _From, #state{name = Name, root_dir = RootDir, module = Module, handle = Handle, last_applied = LastApplied} = State) -> +handle_call({snapshot_open, SnapshotPath, LogPos}, _From, #state{name = Name, module = Module, handle = Handle, last_applied = LastApplied} = State) -> ?LOG_NOTICE("Storage[~0p] replacing storage at ~0p with snapshot at ~0p.", [Name, LastApplied, LogPos], #{domain => [whatsapp, wa_raft]}), - SnapshotPath = filename:join(RootDir, ?SNAPSHOT_NAME(LastIndex, LastTerm)), case Module:storage_open_snapshot(SnapshotPath, LogPos, Handle) of {ok, NewHandle} -> {reply, ok, State#state{last_applied = LogPos, handle = NewHandle}}; {error, Reason} -> {reply, {error, Reason}, State}