From 497356b0aa20f81134f15717a7baf777076bb12d Mon Sep 17 00:00:00 2001 From: Xiaoxi Chen Date: Tue, 3 Dec 2024 12:37:53 +0800 Subject: [PATCH] Implement get_next_batch_size_hint_in_bytes() we use the `byte` as `cnt` as of now. Also update the log_entries_ext() which will be called on leader, if hint < 0 means follower want nothing, return an empty vector so that an empty append_entries_req will be sent, to carry the commit_index update and trigger the follower to commit. if hint > 0, respect the cnt that the follower want, this is useful when two logs within same batch has dependency, we can exclude the dependent one. if hint = 0 means control by leader. Signed-off-by: Xiaoxi Chen --- conanfile.py | 2 +- .../log_store/home_raft_log_store.cpp | 25 +++++++++++++------ .../replication/repl_dev/raft_repl_dev.cpp | 3 +++ .../repl_dev/raft_state_machine.cpp | 21 ++++++++++++++++ .../replication/repl_dev/raft_state_machine.h | 5 ++++ 5 files changed, 47 insertions(+), 9 deletions(-) diff --git a/conanfile.py b/conanfile.py index 99e129017..bc914e16c 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "6.5.18" + version = "6.5.19" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/lib/replication/log_store/home_raft_log_store.cpp b/src/lib/replication/log_store/home_raft_log_store.cpp index a9f7c0301..dcfcfe906 100644 --- a/src/lib/replication/log_store/home_raft_log_store.cpp +++ b/src/lib/replication/log_store/home_raft_log_store.cpp @@ -223,14 +223,23 @@ nuraft::ptr< std::vector< nuraft::ptr< nuraft::log_entry > > > HomeRaftLogStore: nuraft::ptr< std::vector< nuraft::ptr< nuraft::log_entry > > > HomeRaftLogStore::log_entries_ext(ulong start, ulong end, int64_t batch_size_hint_in_bytes) { - // in nuraft , batch_size_hint_in_bytes < 0 indicats that follower is busy now and do not want to receive any more - // log entries ATM. here we just send one log entry if this happens which is helpful for nuobject case and no harm - // to other case. - if (batch_size_hint_in_bytes < 0) end = start + 1; - - // for the case where batch_size_hint_in_bytes >= 0, we do not take any size check here for now. - // TODO: limit the size of the returned entries by batch_size_hint_in_bytes int the future if necessary - return log_entries(start, end); + // WARNING: we interpret batch_size_hint_in_bytes as count as of now. + auto batch_size_hint_cnt = batch_size_hint_in_bytes; + auto new_end = end; + // batch_size_hint_in_bytes < 0 indicats that follower is busy now and do not want to receive any more log entry. + if (batch_size_hint_cnt < 0) + new_end = start; + else if (batch_size_hint_cnt > 0) { + // limit to the hint, also prevent overflow by a huge batch_size_hint_cnt + new_end = std::max(start + (uint64_t)batch_size_hint_cnt, start); + // limit to original end + new_end = std::min(new_end, end); + } + DEBUG_ASSERT(new_end <= end, "new end {} should be <= original end {}", new_end, end); + DEBUG_ASSERT(start <= new_end, "start {} should be <= new_end {}", start, new_end); + REPL_STORE_LOG(TRACE, "log_entries_ext, start={} end={}, hint {}, adjusted range {} ~ {}, cnt {}", start, end, + batch_size_hint_cnt, start, new_end, new_end - start); + return log_entries(start, new_end); } nuraft::ptr< nuraft::log_entry > HomeRaftLogStore::entry_at(ulong index) { diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index 2d93c4070..4c0c433fe 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -1261,6 +1261,8 @@ std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nu auto req = m_state_machine->localize_journal_entry_prepare(*entry); if (req == nullptr) { sisl::VectorPool< repl_req_ptr_t >::free(reqs); + // We are rejecting this log entry, meaning we can accept previous log entries. + m_state_machine->reset_next_batch_size_hint(i); return {true, nuraft::cb_func::ReturnCode::ReturnNull}; } reqs->emplace_back(std::move(req)); @@ -1275,6 +1277,7 @@ std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nu } sisl::VectorPool< repl_req_ptr_t >::free(reqs); } + if (ret == nuraft::cb_func::ReturnCode::Ok) { m_state_machine->inc_next_batch_size_hint(); } return {true, ret}; } diff --git a/src/lib/replication/repl_dev/raft_state_machine.cpp b/src/lib/replication/repl_dev/raft_state_machine.cpp index 2047a3b28..994d2e514 100644 --- a/src/lib/replication/repl_dev/raft_state_machine.cpp +++ b/src/lib/replication/repl_dev/raft_state_machine.cpp @@ -249,6 +249,27 @@ void RaftStateMachine::rollback_ext(const nuraft::state_machine::ext_op_params& m_rd.handle_rollback(rreq); } +int64_t RaftStateMachine::get_next_batch_size_hint_in_bytes() { + return next_batch_size_hint; +} + +int64_t RaftStateMachine::inc_next_batch_size_hint() { + constexpr int64_t next_batch_size_hint_limit = 16; + // set to minimal if previous hint is negative (i.e do not want any log) + if (next_batch_size_hint < 0) { + next_batch_size_hint = 1; + return next_batch_size_hint; + } + // Exponential growth till next_batch_size_hint_limit, set to 0 afterward means leader take control. + next_batch_size_hint = next_batch_size_hint * 2 > next_batch_size_hint_limit ? 0 : next_batch_size_hint; + return next_batch_size_hint; +} + +int64_t RaftStateMachine::reset_next_batch_size_hint(int64_t new_hint) { + next_batch_size_hint = new_hint; + return next_batch_size_hint; +} + void RaftStateMachine::iterate_repl_reqs(std::function< void(int64_t, repl_req_ptr_t rreq) > const& cb) { for (auto [key, rreq] : m_lsn_req_map) { cb(key, rreq); diff --git a/src/lib/replication/repl_dev/raft_state_machine.h b/src/lib/replication/repl_dev/raft_state_machine.h index 6bf4faf5a..fa899c40d 100644 --- a/src/lib/replication/repl_dev/raft_state_machine.h +++ b/src/lib/replication/repl_dev/raft_state_machine.h @@ -97,6 +97,7 @@ class RaftStateMachine : public nuraft::state_machine { nuraft::ptr< nuraft::buffer > m_success_ptr; // Preallocate the success return to raft // iomgr::timer_handle_t m_wait_blkid_write_timer_hdl{iomgr::null_timer_handle}; bool m_resync_mode{false}; + int64_t next_batch_size_hint{0}; public: RaftStateMachine(RaftReplDev& rd); @@ -112,6 +113,7 @@ class RaftStateMachine : public nuraft::state_machine { void rollback_config(const ulong log_idx, raft_cluster_config_ptr_t& conf) override; void rollback_ext(const nuraft::state_machine::ext_op_params& params) override; void become_ready(); + int64_t get_next_batch_size_hint_in_bytes() override; void create_snapshot(nuraft::snapshot& s, nuraft::async_result< bool >::handler_type& when_done) override; int read_logical_snp_obj(nuraft::snapshot& s, void*& user_ctx, ulong obj_id, raft_buf_ptr_t& data_out, @@ -134,8 +136,11 @@ class RaftStateMachine : public nuraft::state_machine { void iterate_repl_reqs(std::function< void(int64_t, repl_req_ptr_t rreq) > const& cb); std::string rdev_name() const; + int64_t reset_next_batch_size_hint(int64_t new_hint); + int64_t inc_next_batch_size_hint(); private: + void after_precommit_in_leader(const nuraft::raft_server::req_ext_cb_params& params); };