From 952b53018c610efdc99a271e9ea0b4a95d33ec67 Mon Sep 17 00:00:00 2001 From: Jie Yao Date: Sun, 1 Sep 2024 04:04:06 -0700 Subject: [PATCH 1/2] assign a new dsn instead of using the default value of 0 --- conanfile.py | 2 +- .../log_store/home_raft_log_store.cpp | 6 ++-- .../replication/repl_dev/raft_repl_dev.cpp | 32 +++++++++++++------ .../repl_dev/raft_state_machine.cpp | 5 ++- src/tests/test_raft_repl_dev.cpp | 2 +- 5 files changed, 33 insertions(+), 14 deletions(-) diff --git a/conanfile.py b/conanfile.py index 96d080591..d6d24c8c0 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "6.4.54" + version = "6.4.55" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/lib/replication/log_store/home_raft_log_store.cpp b/src/lib/replication/log_store/home_raft_log_store.cpp index ce88571e4..3e82f7aa3 100644 --- a/src/lib/replication/log_store/home_raft_log_store.cpp +++ b/src/lib/replication/log_store/home_raft_log_store.cpp @@ -262,10 +262,12 @@ raft_buf_ptr_t HomeRaftLogStore::pack(ulong index, int32_t cnt) { m_log_store->foreach ( to_store_lsn(index), [this, &out_buf, &remain_cnt]([[maybe_unused]] store_lsn_t cur, const log_buffer& entry) mutable -> bool { + size_t const total_entry_size = entry.size() + sizeof(uint32_t); if (remain_cnt-- > 0) { size_t avail_size = out_buf->size() - out_buf->pos(); - if (avail_size < entry.size() + sizeof(uint32_t)) { - avail_size += std::max(out_buf->size() * 2, (size_t)entry.size() + sizeof(uint32_t)); + // available size of packing buffer should be able to hold entry.size() and the length of this entry + if (avail_size < total_entry_size) { + avail_size += std::max(out_buf->size() * 2, total_entry_size); out_buf = nuraft::buffer::expand(*out_buf, avail_size); } REPL_STORE_LOG(TRACE, "packing lsn={} of size={}, avail_size in buffer={}", to_repl_lsn(cur), diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index 530cddff9..76c2ffdca 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -120,7 +120,19 @@ folly::SemiFuture< ReplServiceError > RaftReplDev::destroy_group() { // Propose to the group to destroy auto rreq = repl_req_ptr_t(new repl_req_ctx{}); - rreq->init(repl_key{}, journal_type_t::HS_CTRL_DESTROY, true, sisl::blob{}, sisl::blob{}, 0); + + // if we have a rreq {originator=1, term=1, dsn=0, lsn=7} in follower and a baseline resync is triggerd before the + // rreq is committed in the follower, then the on_commit of the rreq will not be called and as a result this rreq + // will become a garbage rreq in this follower. now if we trigger a destroy_group, a new rreq {originator=1, term=1, + // dsn=0} will created in the follower since the default dsn of a repl_key is 0.after the log of this rreq is + // appended to log store and get a new lsn, if we link the new lsn to the old rreq (rreq is identified by + // {originator, term, dsn}) which has alread have a lsn, then a assert will be throw out. pls refer to + // repl_req_ctx::set_lsn + + // here, we set the dsn to a new one , which is definitely unique in the follower, so that the new rreq will not + // have a conflict with the old rreq. + rreq->init(repl_key{.server_id = server_id(), .term = raft_server()->get_term(), .dsn = m_next_dsn.fetch_add(1)}, + journal_type_t::HS_CTRL_DESTROY, true, sisl::blob{}, sisl::blob{}, 0); auto err = m_state_machine->propose_to_raft(std::move(rreq)); if (err != ReplServiceError::OK) { @@ -526,7 +538,7 @@ void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t > rreq void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) { if (rreqs.size() == 0) { return; } - std::vector<::flatbuffers::Offset< RequestEntry > > entries; + std::vector< ::flatbuffers::Offset< RequestEntry > > entries; entries.reserve(rreqs.size()); shared< flatbuffers::FlatBufferBuilder > builder = std::make_shared< flatbuffers::FlatBufferBuilder >(); @@ -1041,6 +1053,14 @@ std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nu entries.size(), raft_req->get_last_log_term(), start_lsn, start_lsn + entries.size() - 1, m_commit_upto_lsn.load(), raft_req->get_commit_idx()); + if (start_lsn != m_data_journal->next_slot()) { + // if the start_lsn of this batch does not match the next_slot, drop this log batch + // this will happen when the leader is sending the logs which are already received and appended + RD_LOGD("start_lsn={} does not match the next_slot={}, dropping the log batch", start_lsn, + m_data_journal->next_slot()); + return {true, nuraft::cb_func::ReturnCode::ReturnNull}; + } + if (!entries.empty()) { RD_LOGT("Raft channel: Received {} append entries on follower from leader, localizing them", entries.size()); @@ -1050,13 +1070,7 @@ std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nu if (entry->get_val_type() != nuraft::log_val_type::app_log) { continue; } if (entry->get_buf_ptr()->size() == 0) { continue; } auto req = m_state_machine->localize_journal_entry_prepare(*entry); - // TODO :: we need to indentify whether this log entry should be appended to log store. - // 1 for lsn, if the req#lsn is not -1, it means this log has been localized and apeneded before, we - // should skip it. - // 2 for dsn, if the req#dsn is less than the next_dsn, it means this log has been - // committed, we should skip it. - // here, we only check the first condition for now. revisit here if we need to check the second - if (req == nullptr || req->lsn() != -1) { + if (req == nullptr) { sisl::VectorPool< repl_req_ptr_t >::free(reqs); return {true, nuraft::cb_func::ReturnCode::ReturnNull}; } diff --git a/src/lib/replication/repl_dev/raft_state_machine.cpp b/src/lib/replication/repl_dev/raft_state_machine.cpp index ba30095ca..6e920b997 100644 --- a/src/lib/replication/repl_dev/raft_state_machine.cpp +++ b/src/lib/replication/repl_dev/raft_state_machine.cpp @@ -221,7 +221,10 @@ void RaftStateMachine::become_ready() { m_rd.become_ready(); } void RaftStateMachine::unlink_lsn_to_req(int64_t lsn) { auto const it = m_lsn_req_map.find(lsn); - if (it != m_lsn_req_map.cend()) { m_lsn_req_map.erase(lsn); } + if (it != m_lsn_req_map.cend()) { + RD_LOG(DEBUG, "Raft channel: erase lsn {}, rreq {}", lsn, it->second->to_string()); + m_lsn_req_map.erase(lsn); + } } void RaftStateMachine::link_lsn_to_req(repl_req_ptr_t rreq, int64_t lsn) { diff --git a/src/tests/test_raft_repl_dev.cpp b/src/tests/test_raft_repl_dev.cpp index cc55187db..f2919c760 100644 --- a/src/tests/test_raft_repl_dev.cpp +++ b/src/tests/test_raft_repl_dev.cpp @@ -432,7 +432,7 @@ class RaftReplDevTest : public testing::Test { // destroyed for ever. we need handle this in raft_repl_dev. revisit here after making changes at // raft_repl_dev side to hanle this case. this is a workaround to avoid the infinite loop for now. if (i++ > 10 && !force_leave) { - LOGWARN("Waiting for repl dev to get destroyed and it is leader, so do a force leave"); + LOGWARN("has already waited for repl dev to get destroyed for 10 times, so do a force leave"); repl_dev->force_leave(); force_leave = true; } From 548121ba71cb20c84661418808885403867af16a Mon Sep 17 00:00:00 2001 From: Jie Yao Date: Sun, 1 Sep 2024 21:05:45 -0700 Subject: [PATCH 2/2] address comments --- src/lib/replication/repl_dev/raft_repl_dev.cpp | 8 -------- 1 file changed, 8 deletions(-) diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index 76c2ffdca..4c358aa4e 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -1053,14 +1053,6 @@ std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nu entries.size(), raft_req->get_last_log_term(), start_lsn, start_lsn + entries.size() - 1, m_commit_upto_lsn.load(), raft_req->get_commit_idx()); - if (start_lsn != m_data_journal->next_slot()) { - // if the start_lsn of this batch does not match the next_slot, drop this log batch - // this will happen when the leader is sending the logs which are already received and appended - RD_LOGD("start_lsn={} does not match the next_slot={}, dropping the log batch", start_lsn, - m_data_journal->next_slot()); - return {true, nuraft::cb_func::ReturnCode::ReturnNull}; - } - if (!entries.empty()) { RD_LOGT("Raft channel: Received {} append entries on follower from leader, localizing them", entries.size());