Skip to content

Commit

Permalink
flush log on leader as well.
Browse files Browse the repository at this point in the history
Signed-off-by: Xiaoxi Chen <[email protected]>
  • Loading branch information
xiaoxichen committed Jul 16, 2024
1 parent ac2da3d commit 3faa2d6
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 2 deletions.
25 changes: 23 additions & 2 deletions src/lib/replication/log_store/repl_log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,27 @@ void ReplLogStore::end_of_append_batch(ulong start_lsn, ulong count) {

// Start fetch the batch of data for this lsn range from remote if its not available yet.
auto reqs = sisl::VectorPool< repl_req_ptr_t >::alloc();
auto proposer_reqs = sisl::VectorPool< repl_req_ptr_t >::alloc();
bool flush_log = false;
for (int64_t lsn = int64_cast(start_lsn); lsn <= end_lsn; ++lsn) {
auto rreq = m_sm.lsn_to_req(lsn);
// Skip this call in proposer, since this method will synchronously flush the data, which is not required for
// leader. Proposer will call the flush as part of commit after receiving quorum, upon which time, there is a
// high possibility the log entry is already flushed. Skip it for rreq == nullptr which is the case for raft
// config entries.
if ((rreq == nullptr) || rreq->is_proposer()) { continue; }
reqs->emplace_back(std::move(rreq));
if ((rreq == nullptr) /*|| rreq->is_proposer()*/) { continue; }
else if (rreq->is_proposer()) { proposer_reqs->emplace_back(std::move(rreq)); }
else {reqs->emplace_back(std::move(rreq));}
}

RD_LOGT("Raft Channel: end_of_append_batch start_lsn={} count={} num_data_to_be_written={}", start_lsn, count,
reqs->size());

for (auto const& rreq : *reqs) {
if ((rreq == nullptr) || (!rreq->has_linked_data())) { continue; }
LOGINFO("Raft Channel: Data before future wait: rreq=[{}]", rreq->to_compact_string());
}

// All requests are from proposer for data write, so as mentioned above we can skip the flush for now
if (!reqs->empty()) {
// Check the map if data corresponding to all of these requsts have been received and written. If not, schedule
Expand All @@ -73,8 +81,21 @@ void ReplLogStore::end_of_append_batch(ulong start_lsn, ulong count) {
for (auto const& rreq : *reqs) {
if (rreq) { rreq->add_state(repl_req_state_t::LOG_FLUSHED); }
}

for (auto const& rreq : *reqs) {
if ((rreq == nullptr) || (!rreq->has_linked_data())) { continue; }
LOGINFO("Raft Channel: Data after future wait: rreq=[{}]", rreq->to_compact_string());
}
} else if (!proposer_reqs->empty()) {
LOGINFO("Raft Channel: end_of_append_batch, I am proposer, only flush log s from {} , count {}", start_lsn, count);
// Mark all the reqs also completely written
HomeRaftLogStore::end_of_append_batch(start_lsn, count);
for (auto const& rreq : *proposer_reqs) {
if (rreq) { rreq->add_state(repl_req_state_t::LOG_FLUSHED); }
}
}
sisl::VectorPool< repl_req_ptr_t >::free(reqs);
sisl::VectorPool< repl_req_ptr_t >::free(proposer_reqs);
}

std::string ReplLogStore::rdev_name() const { return m_rd.rdev_name(); }
Expand Down
1 change: 1 addition & 0 deletions src/lib/replication/repl_dev/raft_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ raft_buf_ptr_t RaftStateMachine::commit_ext(nuraft::state_machine::ext_op_params
if (!rreq) { RD_LOGD("Raft channel got null rreq"); }
RD_LOGD("Raft channel: Received Commit message rreq=[{}]", rreq->to_compact_string());
if (rreq->is_proposer()) {
RD_LOGD("Leader Raft channel: Received Commit message rreq=[{}]", rreq->to_compact_string());
// This is the time to ensure flushing of journal happens in the proposer
if (m_rd.m_data_journal->last_durable_index() < uint64_cast(lsn)) { m_rd.m_data_journal->flush(); }
rreq->add_state(repl_req_state_t::LOG_FLUSHED);
Expand Down

0 comments on commit 3faa2d6

Please sign in to comment.