From e8690411abf71bcba17c549f72f6abbcd60e92e4 Mon Sep 17 00:00:00 2001 From: Jie Yao Date: Fri, 30 Aug 2024 02:53:33 -0700 Subject: [PATCH] add lsn check before append entry --- conanfile.py | 2 +- .../log_store/home_raft_log_store.cpp | 6 ++++-- src/lib/replication/repl_dev/raft_repl_dev.cpp | 18 ++++++++++-------- src/lib/replication/repl_dev/raft_repl_dev.h | 2 +- .../repl_dev/raft_state_machine.cpp | 5 ++++- src/tests/test_raft_repl_dev.cpp | 2 +- 6 files changed, 21 insertions(+), 14 deletions(-) diff --git a/conanfile.py b/conanfile.py index 96d080591..d6d24c8c0 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "6.4.54" + version = "6.4.55" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/lib/replication/log_store/home_raft_log_store.cpp b/src/lib/replication/log_store/home_raft_log_store.cpp index ce88571e4..3e82f7aa3 100644 --- a/src/lib/replication/log_store/home_raft_log_store.cpp +++ b/src/lib/replication/log_store/home_raft_log_store.cpp @@ -262,10 +262,12 @@ raft_buf_ptr_t HomeRaftLogStore::pack(ulong index, int32_t cnt) { m_log_store->foreach ( to_store_lsn(index), [this, &out_buf, &remain_cnt]([[maybe_unused]] store_lsn_t cur, const log_buffer& entry) mutable -> bool { + size_t const total_entry_size = entry.size() + sizeof(uint32_t); if (remain_cnt-- > 0) { size_t avail_size = out_buf->size() - out_buf->pos(); - if (avail_size < entry.size() + sizeof(uint32_t)) { - avail_size += std::max(out_buf->size() * 2, (size_t)entry.size() + sizeof(uint32_t)); + // available size of packing buffer should be able to hold entry.size() and the length of this entry + if (avail_size < total_entry_size) { + avail_size += std::max(out_buf->size() * 2, total_entry_size); out_buf = nuraft::buffer::expand(*out_buf, avail_size); } REPL_STORE_LOG(TRACE, "packing lsn={} of size={}, avail_size in buffer={}", to_repl_lsn(cur), diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index 530cddff9..d9de7122c 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -526,7 +526,7 @@ void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t > rreq void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) { if (rreqs.size() == 0) { return; } - std::vector<::flatbuffers::Offset< RequestEntry > > entries; + std::vector< ::flatbuffers::Offset< RequestEntry > > entries; entries.reserve(rreqs.size()); shared< flatbuffers::FlatBufferBuilder > builder = std::make_shared< flatbuffers::FlatBufferBuilder >(); @@ -1041,6 +1041,14 @@ std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nu entries.size(), raft_req->get_last_log_term(), start_lsn, start_lsn + entries.size() - 1, m_commit_upto_lsn.load(), raft_req->get_commit_idx()); + if (start_lsn != m_data_journal->next_slot()) { + // if the start_lsn of this batch does not match the next_slot, drop this log batch + // this will happen when the leader is sending the logs which are already received and appended + RD_LOGD("start_lsn={} does not match the next_slot={}, dropping the log batch", start_lsn, + m_data_journal->next_slot()); + return {true, nuraft::cb_func::ReturnCode::ReturnNull}; + } + if (!entries.empty()) { RD_LOGT("Raft channel: Received {} append entries on follower from leader, localizing them", entries.size()); @@ -1050,13 +1058,7 @@ std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nu if (entry->get_val_type() != nuraft::log_val_type::app_log) { continue; } if (entry->get_buf_ptr()->size() == 0) { continue; } auto req = m_state_machine->localize_journal_entry_prepare(*entry); - // TODO :: we need to indentify whether this log entry should be appended to log store. - // 1 for lsn, if the req#lsn is not -1, it means this log has been localized and apeneded before, we - // should skip it. - // 2 for dsn, if the req#dsn is less than the next_dsn, it means this log has been - // committed, we should skip it. - // here, we only check the first condition for now. revisit here if we need to check the second - if (req == nullptr || req->lsn() != -1) { + if (req == nullptr) { sisl::VectorPool< repl_req_ptr_t >::free(reqs); return {true, nuraft::cb_func::ReturnCode::ReturnNull}; } diff --git a/src/lib/replication/repl_dev/raft_repl_dev.h b/src/lib/replication/repl_dev/raft_repl_dev.h index 41594b528..8e703be1c 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.h +++ b/src/lib/replication/repl_dev/raft_repl_dev.h @@ -132,7 +132,7 @@ class RaftReplDev : public ReplDev, repl_lsn_t m_last_flushed_commit_lsn{0}; // LSN upto which it was flushed to persistent store iomgr::timer_handle_t m_sb_flush_timer_hdl; - std::atomic< uint64_t > m_next_dsn{0}; // Data Sequence Number that will keep incrementing for each data entry + std::atomic< uint64_t > m_next_dsn{1}; // Data Sequence Number that will keep incrementing for each data entry iomgr::timer_handle_t m_wait_data_timer_hdl{ iomgr::null_timer_handle}; // non-recurring timer doesn't need to be cancelled on shutdown; diff --git a/src/lib/replication/repl_dev/raft_state_machine.cpp b/src/lib/replication/repl_dev/raft_state_machine.cpp index ba30095ca..6e920b997 100644 --- a/src/lib/replication/repl_dev/raft_state_machine.cpp +++ b/src/lib/replication/repl_dev/raft_state_machine.cpp @@ -221,7 +221,10 @@ void RaftStateMachine::become_ready() { m_rd.become_ready(); } void RaftStateMachine::unlink_lsn_to_req(int64_t lsn) { auto const it = m_lsn_req_map.find(lsn); - if (it != m_lsn_req_map.cend()) { m_lsn_req_map.erase(lsn); } + if (it != m_lsn_req_map.cend()) { + RD_LOG(DEBUG, "Raft channel: erase lsn {}, rreq {}", lsn, it->second->to_string()); + m_lsn_req_map.erase(lsn); + } } void RaftStateMachine::link_lsn_to_req(repl_req_ptr_t rreq, int64_t lsn) { diff --git a/src/tests/test_raft_repl_dev.cpp b/src/tests/test_raft_repl_dev.cpp index cc55187db..f2919c760 100644 --- a/src/tests/test_raft_repl_dev.cpp +++ b/src/tests/test_raft_repl_dev.cpp @@ -432,7 +432,7 @@ class RaftReplDevTest : public testing::Test { // destroyed for ever. we need handle this in raft_repl_dev. revisit here after making changes at // raft_repl_dev side to hanle this case. this is a workaround to avoid the infinite loop for now. if (i++ > 10 && !force_leave) { - LOGWARN("Waiting for repl dev to get destroyed and it is leader, so do a force leave"); + LOGWARN("has already waited for repl dev to get destroyed for 10 times, so do a force leave"); repl_dev->force_leave(); force_leave = true; }