Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement get_next_batch_size_hint_in_bytes() #599

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.5.19"
version = "6.5.20"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
29 changes: 21 additions & 8 deletions src/lib/replication/log_store/home_raft_log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,14 +223,27 @@ nuraft::ptr< std::vector< nuraft::ptr< nuraft::log_entry > > > HomeRaftLogStore:

nuraft::ptr< std::vector< nuraft::ptr< nuraft::log_entry > > >
HomeRaftLogStore::log_entries_ext(ulong start, ulong end, int64_t batch_size_hint_in_bytes) {
// in nuraft , batch_size_hint_in_bytes < 0 indicats that follower is busy now and do not want to receive any more
// log entries ATM. here we just send one log entry if this happens which is helpful for nuobject case and no harm
// to other case.
if (batch_size_hint_in_bytes < 0) end = start + 1;

// for the case where batch_size_hint_in_bytes >= 0, we do not take any size check here for now.
// TODO: limit the size of the returned entries by batch_size_hint_in_bytes int the future if necessary
return log_entries(start, end);
// WARNING: we interpret batch_size_hint_in_bytes as count as of now.
auto batch_size_hint_cnt = batch_size_hint_in_bytes;
auto new_end = end;
// batch_size_hint_in_bytes < 0 indicats that follower is busy now and do not want to receive any more log entry.
if (batch_size_hint_cnt < 0)
new_end = start;
else if (batch_size_hint_cnt > 0) {
// limit to the hint, also prevent overflow by a huge batch_size_hint_cnt
if (sisl_unlikely(start + (uint64_t)batch_size_hint_cnt < start)) {
new_end = end;
} else {
new_end = start + (uint64_t)batch_size_hint_cnt;
}
// limit to original end
new_end = std::min(new_end, end);
JacksonYao287 marked this conversation as resolved.
Show resolved Hide resolved
}
DEBUG_ASSERT(new_end <= end, "new end {} should be <= original end {}", new_end, end);
Copy link
Contributor

@yamingk yamingk Dec 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: DEBUG_ASSERT_LE(new_ned, end); and let macro to print it is value when assert fail

DEBUG_ASSERT(start <= new_end, "start {} should be <= new_end {}", start, new_end);
REPL_STORE_LOG(TRACE, "log_entries_ext, start={} end={}, hint {}, adjusted range {} ~ {}, cnt {}", start, end,
batch_size_hint_cnt, start, new_end, new_end - start);
return log_entries(start, new_end);
}

nuraft::ptr< nuraft::log_entry > HomeRaftLogStore::entry_at(ulong index) {
Expand Down
8 changes: 8 additions & 0 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1261,6 +1261,13 @@ std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nu
auto req = m_state_machine->localize_journal_entry_prepare(*entry);
if (req == nullptr) {
sisl::VectorPool< repl_req_ptr_t >::free(reqs);
// The hint set here will be used by the next after next appendEntry, the next one
// always go with -1 from NuRraft code.
//
// We are rejecting this log entry, meaning we can accept previous log entries.
// If there is nothing we can accept(i==0), that maens we are waiting for commit
// of previous lsn, set it to 1 in this case.
m_state_machine->reset_next_batch_size_hint(std::max(1ul, i));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you help me understand if we are setting 1 here, how can we get batch_size_hint_cnt to be set as -1 at HomeRaftLogStore::log_entries_ext, so that we can apply empty append entries and only send commit to the follower?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return {true, nuraft::cb_func::ReturnCode::ReturnNull};
}
reqs->emplace_back(std::move(req));
Expand All @@ -1275,6 +1282,7 @@ std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nu
}
sisl::VectorPool< repl_req_ptr_t >::free(reqs);
}
if (ret == nuraft::cb_func::ReturnCode::Ok) { m_state_machine->inc_next_batch_size_hint(); }
return {true, ret};
}

Expand Down
19 changes: 19 additions & 0 deletions src/lib/replication/repl_dev/raft_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,25 @@ void RaftStateMachine::rollback_ext(const nuraft::state_machine::ext_op_params&
m_rd.handle_rollback(rreq);
}

int64_t RaftStateMachine::get_next_batch_size_hint_in_bytes() { return next_batch_size_hint; }

int64_t RaftStateMachine::inc_next_batch_size_hint() {
constexpr int64_t next_batch_size_hint_limit = 16;
// set to minimal if previous hint is negative (i.e do not want any log)
if (next_batch_size_hint < 0) {
next_batch_size_hint = 1;
return next_batch_size_hint;
}
// Exponential growth till next_batch_size_hint_limit, set to 0 afterward means leader take control.
next_batch_size_hint = next_batch_size_hint * 2 > next_batch_size_hint_limit ? 0 : next_batch_size_hint * 2;
return next_batch_size_hint;
}

int64_t RaftStateMachine::reset_next_batch_size_hint(int64_t new_hint) {
next_batch_size_hint = new_hint;
return next_batch_size_hint;
}

void RaftStateMachine::iterate_repl_reqs(std::function< void(int64_t, repl_req_ptr_t rreq) > const& cb) {
for (auto [key, rreq] : m_lsn_req_map) {
cb(key, rreq);
Expand Down
4 changes: 4 additions & 0 deletions src/lib/replication/repl_dev/raft_state_machine.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ class RaftStateMachine : public nuraft::state_machine {
nuraft::ptr< nuraft::buffer > m_success_ptr; // Preallocate the success return to raft
// iomgr::timer_handle_t m_wait_blkid_write_timer_hdl{iomgr::null_timer_handle};
bool m_resync_mode{false};
int64_t next_batch_size_hint{0};

public:
RaftStateMachine(RaftReplDev& rd);
Expand All @@ -116,6 +117,7 @@ class RaftStateMachine : public nuraft::state_machine {
void rollback_config(const ulong log_idx, raft_cluster_config_ptr_t& conf) override;
void rollback_ext(const nuraft::state_machine::ext_op_params& params) override;
void become_ready();
int64_t get_next_batch_size_hint_in_bytes() override;

void create_snapshot(nuraft::snapshot& s, nuraft::async_result< bool >::handler_type& when_done) override;
int read_logical_snp_obj(nuraft::snapshot& s, void*& user_ctx, ulong obj_id, raft_buf_ptr_t& data_out,
Expand All @@ -138,6 +140,8 @@ class RaftStateMachine : public nuraft::state_machine {
void iterate_repl_reqs(std::function< void(int64_t, repl_req_ptr_t rreq) > const& cb);

std::string rdev_name() const;
int64_t reset_next_batch_size_hint(int64_t new_hint);
int64_t inc_next_batch_size_hint();

static bool is_hs_snp_obj(uint64_t obj_id) { return (obj_id & snp_obj_id_type_app) == 0; }

Expand Down
Loading