From 3faa2d6b1e9c29700be145c73a163d60f92ae482 Mon Sep 17 00:00:00 2001 From: Xiaoxi Chen Date: Thu, 11 Jul 2024 10:02:17 -0700 Subject: [PATCH] flush log on leader as well. Signed-off-by: Xiaoxi Chen --- .../replication/log_store/repl_log_store.cpp | 25 +++++++++++++++++-- .../repl_dev/raft_state_machine.cpp | 1 + 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/src/lib/replication/log_store/repl_log_store.cpp b/src/lib/replication/log_store/repl_log_store.cpp index c678a90d4..65a88a772 100644 --- a/src/lib/replication/log_store/repl_log_store.cpp +++ b/src/lib/replication/log_store/repl_log_store.cpp @@ -41,19 +41,27 @@ void ReplLogStore::end_of_append_batch(ulong start_lsn, ulong count) { // Start fetch the batch of data for this lsn range from remote if its not available yet. auto reqs = sisl::VectorPool< repl_req_ptr_t >::alloc(); + auto proposer_reqs = sisl::VectorPool< repl_req_ptr_t >::alloc(); + bool flush_log = false; for (int64_t lsn = int64_cast(start_lsn); lsn <= end_lsn; ++lsn) { auto rreq = m_sm.lsn_to_req(lsn); // Skip this call in proposer, since this method will synchronously flush the data, which is not required for // leader. Proposer will call the flush as part of commit after receiving quorum, upon which time, there is a // high possibility the log entry is already flushed. Skip it for rreq == nullptr which is the case for raft // config entries. - if ((rreq == nullptr) || rreq->is_proposer()) { continue; } - reqs->emplace_back(std::move(rreq)); + if ((rreq == nullptr) /*|| rreq->is_proposer()*/) { continue; } + else if (rreq->is_proposer()) { proposer_reqs->emplace_back(std::move(rreq)); } + else {reqs->emplace_back(std::move(rreq));} } RD_LOGT("Raft Channel: end_of_append_batch start_lsn={} count={} num_data_to_be_written={}", start_lsn, count, reqs->size()); + for (auto const& rreq : *reqs) { + if ((rreq == nullptr) || (!rreq->has_linked_data())) { continue; } + LOGINFO("Raft Channel: Data before future wait: rreq=[{}]", rreq->to_compact_string()); + } + // All requests are from proposer for data write, so as mentioned above we can skip the flush for now if (!reqs->empty()) { // Check the map if data corresponding to all of these requsts have been received and written. If not, schedule @@ -73,8 +81,21 @@ void ReplLogStore::end_of_append_batch(ulong start_lsn, ulong count) { for (auto const& rreq : *reqs) { if (rreq) { rreq->add_state(repl_req_state_t::LOG_FLUSHED); } } + + for (auto const& rreq : *reqs) { + if ((rreq == nullptr) || (!rreq->has_linked_data())) { continue; } + LOGINFO("Raft Channel: Data after future wait: rreq=[{}]", rreq->to_compact_string()); + } + } else if (!proposer_reqs->empty()) { + LOGINFO("Raft Channel: end_of_append_batch, I am proposer, only flush log s from {} , count {}", start_lsn, count); + // Mark all the reqs also completely written + HomeRaftLogStore::end_of_append_batch(start_lsn, count); + for (auto const& rreq : *proposer_reqs) { + if (rreq) { rreq->add_state(repl_req_state_t::LOG_FLUSHED); } + } } sisl::VectorPool< repl_req_ptr_t >::free(reqs); + sisl::VectorPool< repl_req_ptr_t >::free(proposer_reqs); } std::string ReplLogStore::rdev_name() const { return m_rd.rdev_name(); } diff --git a/src/lib/replication/repl_dev/raft_state_machine.cpp b/src/lib/replication/repl_dev/raft_state_machine.cpp index a805b229b..a341346b4 100644 --- a/src/lib/replication/repl_dev/raft_state_machine.cpp +++ b/src/lib/replication/repl_dev/raft_state_machine.cpp @@ -190,6 +190,7 @@ raft_buf_ptr_t RaftStateMachine::commit_ext(nuraft::state_machine::ext_op_params if (!rreq) { RD_LOGD("Raft channel got null rreq"); } RD_LOGD("Raft channel: Received Commit message rreq=[{}]", rreq->to_compact_string()); if (rreq->is_proposer()) { + RD_LOGD("Leader Raft channel: Received Commit message rreq=[{}]", rreq->to_compact_string()); // This is the time to ensure flushing of journal happens in the proposer if (m_rd.m_data_journal->last_durable_index() < uint64_cast(lsn)) { m_rd.m_data_journal->flush(); } rreq->add_state(repl_req_state_t::LOG_FLUSHED);