Skip to content

Commit

Permalink
Implement get_next_batch_size_hint_in_bytes()
Browse files Browse the repository at this point in the history
we use the `byte` as `cnt` as of now.

Also update the log_entries_ext() which will be called on leader,

if hint < 0 means follower want nothing, return an empty vector
so that an empty append_entries_req will be sent, to carry the
commit_index update and trigger the follower to commit.

if hint > 0, respect the cnt that the follower want,  this is useful
when two logs within same batch has dependency, we can exclude the
dependent one.

if hint = 0 means control by leader.

Signed-off-by: Xiaoxi Chen <[email protected]>
  • Loading branch information
xiaoxichen committed Dec 4, 2024
1 parent 7811855 commit 8fa4f6b
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 9 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.5.18"
version = "6.5.19"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
29 changes: 21 additions & 8 deletions src/lib/replication/log_store/home_raft_log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,14 +223,27 @@ nuraft::ptr< std::vector< nuraft::ptr< nuraft::log_entry > > > HomeRaftLogStore:

nuraft::ptr< std::vector< nuraft::ptr< nuraft::log_entry > > >
HomeRaftLogStore::log_entries_ext(ulong start, ulong end, int64_t batch_size_hint_in_bytes) {
// in nuraft , batch_size_hint_in_bytes < 0 indicats that follower is busy now and do not want to receive any more
// log entries ATM. here we just send one log entry if this happens which is helpful for nuobject case and no harm
// to other case.
if (batch_size_hint_in_bytes < 0) end = start + 1;

// for the case where batch_size_hint_in_bytes >= 0, we do not take any size check here for now.
// TODO: limit the size of the returned entries by batch_size_hint_in_bytes int the future if necessary
return log_entries(start, end);
// WARNING: we interpret batch_size_hint_in_bytes as count as of now.
auto batch_size_hint_cnt = batch_size_hint_in_bytes;
auto new_end = end;
// batch_size_hint_in_bytes < 0 indicats that follower is busy now and do not want to receive any more log entry.
if (batch_size_hint_cnt < 0)
new_end = start;
else if (batch_size_hint_cnt > 0) {
// limit to the hint, also prevent overflow by a huge batch_size_hint_cnt
if (sisl_unlikely(start + (uint64_t)batch_size_hint_cnt < start)) {
new_end = end;
} else {
new_end = start + (uint64_t)batch_size_hint_cnt;
}
// limit to original end
new_end = std::min(new_end, end);
}
DEBUG_ASSERT(new_end <= end, "new end {} should be <= original end {}", new_end, end);
DEBUG_ASSERT(start <= new_end, "start {} should be <= new_end {}", start, new_end);
REPL_STORE_LOG(TRACE, "log_entries_ext, start={} end={}, hint {}, adjusted range {} ~ {}, cnt {}", start, end,
batch_size_hint_cnt, start, new_end, new_end - start);
return log_entries(start, new_end);
}

nuraft::ptr< nuraft::log_entry > HomeRaftLogStore::entry_at(ulong index) {
Expand Down
8 changes: 8 additions & 0 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1261,6 +1261,13 @@ std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nu
auto req = m_state_machine->localize_journal_entry_prepare(*entry);
if (req == nullptr) {
sisl::VectorPool< repl_req_ptr_t >::free(reqs);
// The hint set here will be used by the next after next appendEntry, the next one
// always go with -1 from NuRraft code.
//
// We are rejecting this log entry, meaning we can accept previous log entries.
// If there is nothing we can accept(i==0), that maens we are waiting for commit
// of previous lsn, set it to 1 in this case.
m_state_machine->reset_next_batch_size_hint(std::max(1ul, i));
return {true, nuraft::cb_func::ReturnCode::ReturnNull};
}
reqs->emplace_back(std::move(req));
Expand All @@ -1275,6 +1282,7 @@ std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nu
}
sisl::VectorPool< repl_req_ptr_t >::free(reqs);
}
if (ret == nuraft::cb_func::ReturnCode::Ok) { m_state_machine->inc_next_batch_size_hint(); }
return {true, ret};
}

Expand Down
19 changes: 19 additions & 0 deletions src/lib/replication/repl_dev/raft_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,25 @@ void RaftStateMachine::rollback_ext(const nuraft::state_machine::ext_op_params&
m_rd.handle_rollback(rreq);
}

int64_t RaftStateMachine::get_next_batch_size_hint_in_bytes() { return next_batch_size_hint; }

int64_t RaftStateMachine::inc_next_batch_size_hint() {
constexpr int64_t next_batch_size_hint_limit = 16;
// set to minimal if previous hint is negative (i.e do not want any log)
if (next_batch_size_hint < 0) {
next_batch_size_hint = 1;
return next_batch_size_hint;
}
// Exponential growth till next_batch_size_hint_limit, set to 0 afterward means leader take control.
next_batch_size_hint = next_batch_size_hint * 2 > next_batch_size_hint_limit ? 0 : next_batch_size_hint * 2;
return next_batch_size_hint;
}

int64_t RaftStateMachine::reset_next_batch_size_hint(int64_t new_hint) {
next_batch_size_hint = new_hint;
return next_batch_size_hint;
}

void RaftStateMachine::iterate_repl_reqs(std::function< void(int64_t, repl_req_ptr_t rreq) > const& cb) {
for (auto [key, rreq] : m_lsn_req_map) {
cb(key, rreq);
Expand Down
4 changes: 4 additions & 0 deletions src/lib/replication/repl_dev/raft_state_machine.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ class RaftStateMachine : public nuraft::state_machine {
nuraft::ptr< nuraft::buffer > m_success_ptr; // Preallocate the success return to raft
// iomgr::timer_handle_t m_wait_blkid_write_timer_hdl{iomgr::null_timer_handle};
bool m_resync_mode{false};
int64_t next_batch_size_hint{0};

public:
RaftStateMachine(RaftReplDev& rd);
Expand All @@ -112,6 +113,7 @@ class RaftStateMachine : public nuraft::state_machine {
void rollback_config(const ulong log_idx, raft_cluster_config_ptr_t& conf) override;
void rollback_ext(const nuraft::state_machine::ext_op_params& params) override;
void become_ready();
int64_t get_next_batch_size_hint_in_bytes() override;

void create_snapshot(nuraft::snapshot& s, nuraft::async_result< bool >::handler_type& when_done) override;
int read_logical_snp_obj(nuraft::snapshot& s, void*& user_ctx, ulong obj_id, raft_buf_ptr_t& data_out,
Expand All @@ -134,6 +136,8 @@ class RaftStateMachine : public nuraft::state_machine {
void iterate_repl_reqs(std::function< void(int64_t, repl_req_ptr_t rreq) > const& cb);

std::string rdev_name() const;
int64_t reset_next_batch_size_hint(int64_t new_hint);
int64_t inc_next_batch_size_hint();

private:
void after_precommit_in_leader(const nuraft::raft_server::req_ext_cb_params& params);
Expand Down

0 comments on commit 8fa4f6b

Please sign in to comment.