diff --git a/conanfile.py b/conanfile.py index bc914e16c..c3dba8dc8 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "6.5.19" + version = "6.5.20" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/lib/replication/log_store/home_raft_log_store.cpp b/src/lib/replication/log_store/home_raft_log_store.cpp index a9f7c0301..823ab62bc 100644 --- a/src/lib/replication/log_store/home_raft_log_store.cpp +++ b/src/lib/replication/log_store/home_raft_log_store.cpp @@ -223,14 +223,27 @@ nuraft::ptr< std::vector< nuraft::ptr< nuraft::log_entry > > > HomeRaftLogStore: nuraft::ptr< std::vector< nuraft::ptr< nuraft::log_entry > > > HomeRaftLogStore::log_entries_ext(ulong start, ulong end, int64_t batch_size_hint_in_bytes) { - // in nuraft , batch_size_hint_in_bytes < 0 indicats that follower is busy now and do not want to receive any more - // log entries ATM. here we just send one log entry if this happens which is helpful for nuobject case and no harm - // to other case. - if (batch_size_hint_in_bytes < 0) end = start + 1; - - // for the case where batch_size_hint_in_bytes >= 0, we do not take any size check here for now. - // TODO: limit the size of the returned entries by batch_size_hint_in_bytes int the future if necessary - return log_entries(start, end); + // WARNING: we interpret batch_size_hint_in_bytes as count as of now. + auto batch_size_hint_cnt = batch_size_hint_in_bytes; + auto new_end = end; + // batch_size_hint_in_bytes < 0 indicats that follower is busy now and do not want to receive any more log entry. + if (batch_size_hint_cnt < 0) + new_end = start; + else if (batch_size_hint_cnt > 0) { + // limit to the hint, also prevent overflow by a huge batch_size_hint_cnt + if (sisl_unlikely(start + (uint64_t)batch_size_hint_cnt < start)) { + new_end = end; + } else { + new_end = start + (uint64_t)batch_size_hint_cnt; + } + // limit to original end + new_end = std::min(new_end, end); + } + DEBUG_ASSERT(new_end <= end, "new end {} should be <= original end {}", new_end, end); + DEBUG_ASSERT(start <= new_end, "start {} should be <= new_end {}", start, new_end); + REPL_STORE_LOG(TRACE, "log_entries_ext, start={} end={}, hint {}, adjusted range {} ~ {}, cnt {}", start, end, + batch_size_hint_cnt, start, new_end, new_end - start); + return log_entries(start, new_end); } nuraft::ptr< nuraft::log_entry > HomeRaftLogStore::entry_at(ulong index) { diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index 72a39a27a..4be1aa78e 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -1261,6 +1261,13 @@ std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nu auto req = m_state_machine->localize_journal_entry_prepare(*entry); if (req == nullptr) { sisl::VectorPool< repl_req_ptr_t >::free(reqs); + // The hint set here will be used by the next after next appendEntry, the next one + // always go with -1 from NuRraft code. + // + // We are rejecting this log entry, meaning we can accept previous log entries. + // If there is nothing we can accept(i==0), that maens we are waiting for commit + // of previous lsn, set it to 1 in this case. + m_state_machine->reset_next_batch_size_hint(std::max(1ul, i)); return {true, nuraft::cb_func::ReturnCode::ReturnNull}; } reqs->emplace_back(std::move(req)); @@ -1275,6 +1282,7 @@ std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nu } sisl::VectorPool< repl_req_ptr_t >::free(reqs); } + if (ret == nuraft::cb_func::ReturnCode::Ok) { m_state_machine->inc_next_batch_size_hint(); } return {true, ret}; } diff --git a/src/lib/replication/repl_dev/raft_state_machine.cpp b/src/lib/replication/repl_dev/raft_state_machine.cpp index b64a32c24..8909614a0 100644 --- a/src/lib/replication/repl_dev/raft_state_machine.cpp +++ b/src/lib/replication/repl_dev/raft_state_machine.cpp @@ -249,6 +249,25 @@ void RaftStateMachine::rollback_ext(const nuraft::state_machine::ext_op_params& m_rd.handle_rollback(rreq); } +int64_t RaftStateMachine::get_next_batch_size_hint_in_bytes() { return next_batch_size_hint; } + +int64_t RaftStateMachine::inc_next_batch_size_hint() { + constexpr int64_t next_batch_size_hint_limit = 16; + // set to minimal if previous hint is negative (i.e do not want any log) + if (next_batch_size_hint < 0) { + next_batch_size_hint = 1; + return next_batch_size_hint; + } + // Exponential growth till next_batch_size_hint_limit, set to 0 afterward means leader take control. + next_batch_size_hint = next_batch_size_hint * 2 > next_batch_size_hint_limit ? 0 : next_batch_size_hint * 2; + return next_batch_size_hint; +} + +int64_t RaftStateMachine::reset_next_batch_size_hint(int64_t new_hint) { + next_batch_size_hint = new_hint; + return next_batch_size_hint; +} + void RaftStateMachine::iterate_repl_reqs(std::function< void(int64_t, repl_req_ptr_t rreq) > const& cb) { for (auto [key, rreq] : m_lsn_req_map) { cb(key, rreq); diff --git a/src/lib/replication/repl_dev/raft_state_machine.h b/src/lib/replication/repl_dev/raft_state_machine.h index 8f00cec43..2b50fea7b 100644 --- a/src/lib/replication/repl_dev/raft_state_machine.h +++ b/src/lib/replication/repl_dev/raft_state_machine.h @@ -101,6 +101,7 @@ class RaftStateMachine : public nuraft::state_machine { nuraft::ptr< nuraft::buffer > m_success_ptr; // Preallocate the success return to raft // iomgr::timer_handle_t m_wait_blkid_write_timer_hdl{iomgr::null_timer_handle}; bool m_resync_mode{false}; + int64_t next_batch_size_hint{0}; public: RaftStateMachine(RaftReplDev& rd); @@ -116,6 +117,7 @@ class RaftStateMachine : public nuraft::state_machine { void rollback_config(const ulong log_idx, raft_cluster_config_ptr_t& conf) override; void rollback_ext(const nuraft::state_machine::ext_op_params& params) override; void become_ready(); + int64_t get_next_batch_size_hint_in_bytes() override; void create_snapshot(nuraft::snapshot& s, nuraft::async_result< bool >::handler_type& when_done) override; int read_logical_snp_obj(nuraft::snapshot& s, void*& user_ctx, ulong obj_id, raft_buf_ptr_t& data_out, @@ -138,6 +140,8 @@ class RaftStateMachine : public nuraft::state_machine { void iterate_repl_reqs(std::function< void(int64_t, repl_req_ptr_t rreq) > const& cb); std::string rdev_name() const; + int64_t reset_next_batch_size_hint(int64_t new_hint); + int64_t inc_next_batch_size_hint(); static bool is_hs_snp_obj(uint64_t obj_id) { return (obj_id & snp_obj_id_type_app) == 0; }