diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index fcc20c325..d1daa95de 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -1191,6 +1191,29 @@ std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nu entries.size()); auto reqs = sisl::VectorPool< repl_req_ptr_t >::alloc(); + auto last_commit_lsn = uint64_cast(get_last_commit_lsn()); + for (unsigned long i=0; i < entries.size(); i++) { + auto& entry = entries[i]; + auto lsn = start_lsn + i; + auto term = entry->get_term(); + if (entry->get_val_type() != nuraft::log_val_type::app_log) { continue; } + if (entry->get_buf_ptr()->size() == 0) { continue; } + // skipping localize for already committed log(dup), they anyway will be discard + // by nuraft before append_log. + if (lsn <= last_commit_lsn) { + RD_LOGT("Raft channel: term {}, lsn {}, skipping dup, last_commit_lsn {}", + term, lsn, last_commit_lsn); + continue; + } + // Those LSNs already in logstore but not yet committed, will be dedup here, + // applier_create_req will return same req as previous one + auto req = m_state_machine->localize_journal_entry_prepare(*entry); + if (req == nullptr) { + sisl::VectorPool< repl_req_ptr_t >::free(reqs); + return {true, nuraft::cb_func::ReturnCode::ReturnNull}; + } + reqs->emplace_back(std::move(req)); + } for (auto& entry : entries) { if (entry->get_val_type() != nuraft::log_val_type::app_log) { continue; } if (entry->get_buf_ptr()->size() == 0) { continue; }