From 69f46216ed68da1f8f6a49124c81305131457eaf Mon Sep 17 00:00:00 2001 From: Jie Yao Date: Thu, 29 Aug 2024 23:50:12 -0700 Subject: [PATCH] add lsn check before append entry --- conanfile.py | 2 +- .../log_store/home_raft_log_store.cpp | 6 ++++-- src/lib/replication/repl_dev/raft_repl_dev.cpp | 16 ++++++++-------- src/tests/test_raft_repl_dev.cpp | 2 +- 4 files changed, 14 insertions(+), 12 deletions(-) diff --git a/conanfile.py b/conanfile.py index 96d080591..d6d24c8c0 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "6.4.54" + version = "6.4.55" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/lib/replication/log_store/home_raft_log_store.cpp b/src/lib/replication/log_store/home_raft_log_store.cpp index ce88571e4..3e82f7aa3 100644 --- a/src/lib/replication/log_store/home_raft_log_store.cpp +++ b/src/lib/replication/log_store/home_raft_log_store.cpp @@ -262,10 +262,12 @@ raft_buf_ptr_t HomeRaftLogStore::pack(ulong index, int32_t cnt) { m_log_store->foreach ( to_store_lsn(index), [this, &out_buf, &remain_cnt]([[maybe_unused]] store_lsn_t cur, const log_buffer& entry) mutable -> bool { + size_t const total_entry_size = entry.size() + sizeof(uint32_t); if (remain_cnt-- > 0) { size_t avail_size = out_buf->size() - out_buf->pos(); - if (avail_size < entry.size() + sizeof(uint32_t)) { - avail_size += std::max(out_buf->size() * 2, (size_t)entry.size() + sizeof(uint32_t)); + // available size of packing buffer should be able to hold entry.size() and the length of this entry + if (avail_size < total_entry_size) { + avail_size += std::max(out_buf->size() * 2, total_entry_size); out_buf = nuraft::buffer::expand(*out_buf, avail_size); } REPL_STORE_LOG(TRACE, "packing lsn={} of size={}, avail_size in buffer={}", to_repl_lsn(cur), diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index 530cddff9..12831f209 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -526,7 +526,7 @@ void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t > rreq void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) { if (rreqs.size() == 0) { return; } - std::vector<::flatbuffers::Offset< RequestEntry > > entries; + std::vector< ::flatbuffers::Offset< RequestEntry > > entries; entries.reserve(rreqs.size()); shared< flatbuffers::FlatBufferBuilder > builder = std::make_shared< flatbuffers::FlatBufferBuilder >(); @@ -1041,6 +1041,12 @@ std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nu entries.size(), raft_req->get_last_log_term(), start_lsn, start_lsn + entries.size() - 1, m_commit_upto_lsn.load(), raft_req->get_commit_idx()); + if (start_lsn != m_data_journal->next_slot()) { + // if the start_lsn of this batch does not match the next_slot, drop this log batch + // this will happen when the leader is sending the logs which are already received and appended + return {true, nuraft::cb_func::ReturnCode::ReturnNull}; + } + if (!entries.empty()) { RD_LOGT("Raft channel: Received {} append entries on follower from leader, localizing them", entries.size()); @@ -1050,13 +1056,7 @@ std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nu if (entry->get_val_type() != nuraft::log_val_type::app_log) { continue; } if (entry->get_buf_ptr()->size() == 0) { continue; } auto req = m_state_machine->localize_journal_entry_prepare(*entry); - // TODO :: we need to indentify whether this log entry should be appended to log store. - // 1 for lsn, if the req#lsn is not -1, it means this log has been localized and apeneded before, we - // should skip it. - // 2 for dsn, if the req#dsn is less than the next_dsn, it means this log has been - // committed, we should skip it. - // here, we only check the first condition for now. revisit here if we need to check the second - if (req == nullptr || req->lsn() != -1) { + if (req == nullptr) { sisl::VectorPool< repl_req_ptr_t >::free(reqs); return {true, nuraft::cb_func::ReturnCode::ReturnNull}; } diff --git a/src/tests/test_raft_repl_dev.cpp b/src/tests/test_raft_repl_dev.cpp index cc55187db..f2919c760 100644 --- a/src/tests/test_raft_repl_dev.cpp +++ b/src/tests/test_raft_repl_dev.cpp @@ -432,7 +432,7 @@ class RaftReplDevTest : public testing::Test { // destroyed for ever. we need handle this in raft_repl_dev. revisit here after making changes at // raft_repl_dev side to hanle this case. this is a workaround to avoid the infinite loop for now. if (i++ > 10 && !force_leave) { - LOGWARN("Waiting for repl dev to get destroyed and it is leader, so do a force leave"); + LOGWARN("has already waited for repl dev to get destroyed for 10 times, so do a force leave"); repl_dev->force_leave(); force_leave = true; }