Skip to content

Commit

Permalink
PushData only pushed to active followers.
Browse files Browse the repository at this point in the history
If a follower is lagging too far, do not flood it with data
from new IOs (new rreq, new LSNs) ,  reserve the capability
for catching up,  that follower can request data via FetchData.

Signed-off-by: Xiaoxi Chen <[email protected]>
  • Loading branch information
xiaoxichen committed Nov 7, 2024
1 parent 3882211 commit 07082de
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 17 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.5.8"
version = "6.5.9"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
3 changes: 3 additions & 0 deletions src/lib/common/homestore_config.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,9 @@ table Consensus {

// Log difference to determine if the follower is in resync mode
resync_log_idx_threshold: int64 = 100;

// Log difference from leader's point of view, to determine if the follower is laggy.
laggy_threshold: int64 = 2000;
}

table HomeStoreSettings {
Expand Down
58 changes: 42 additions & 16 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -363,23 +363,30 @@ void RaftReplDev::push_data_to_all_followers(repl_req_ptr_t rreq, sisl::sg_list
flatbuffers::FlatBufferToString(builder.GetBufferPointer() + sizeof(flatbuffers::uoffset_t),
PushDataRequestTypeTable()));*/

RD_LOGD("Data Channel: Pushing data to all followers: rreq=[{}]", rreq->to_string());

group_msg_service()
->data_service_request_unidirectional(nuraft_mesg::role_regex::ALL, PUSH_DATA, rreq->m_pkts)
.via(&folly::InlineExecutor::instance())
.thenValue([this, rreq = std::move(rreq)](auto e) {
if (e.hasError()) {
RD_LOGE("Data Channel: Error in pushing data to all followers: rreq=[{}] error={}", rreq->to_string(),
e.error());
handle_error(rreq, RaftReplService::to_repl_error(e.error()));
return;
auto peers = get_active_peers();
auto calls = std::vector< nuraft_mesg::NullAsyncResult >();
for (auto peer : peers) {
RD_LOGD("Data Channel: Pushing data to follower {}, rreq=[{}]", peer, rreq->to_string());
calls.push_back(group_msg_service()
->data_service_request_unidirectional(peer, PUSH_DATA, rreq->m_pkts)
.via(&folly::InlineExecutor::instance()));
}
folly::collectAllUnsafe(calls).thenValue([this, rreq](auto&& v_res) {
for (auto const& res : v_res) {
if (sisl_likely(res.value())) {
auto r = res.value();
if (r.hasError()) {
// Just logging PushData error, no action is needed as follower can try by fetchData.
RD_LOGW("Data Channel: Error in pushing data to all followers: rreq=[{}] error={}",
rreq->to_string(), r.error());
}
}
// Release the buffer which holds the packets
RD_LOGD("Data Channel: Data push completed for rreq=[{}]", rreq->to_string());
rreq->release_fb_builder();
rreq->m_pkts.clear();
});
}
RD_LOGD("Data Channel: Data push completed for rreq=[{}]", rreq->to_string());
// Release the buffer which holds the packets
rreq->release_fb_builder();
rreq->m_pkts.clear();
});
}

void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_data) {
Expand Down Expand Up @@ -1020,6 +1027,25 @@ std::vector< peer_info > RaftReplDev::get_replication_status() const {
return pi;
}

std::set< replica_id_t > RaftReplDev::get_active_peers() const {
auto repl_status = get_replication_status();
std::set< replica_id_t > res;
auto my_committed_idx = m_commit_upto_lsn.load();
uint64_t lagThreshold = my_committed_idx > HS_DYNAMIC_CONFIG(consensus.laggy_threshold)
? my_committed_idx - HS_DYNAMIC_CONFIG(consensus.laggy_threshold)
: 0;
for (auto p : repl_status) {
if (p.id_ == m_my_repl_id) { continue; }
if (p.replication_idx_ >= lagThreshold) {
res.insert(p.id_);
} else {
RD_LOGW("Excluding peer {} from active_peers, lag {}, my lsn {}, peer lsn {}", p.id_,
my_committed_idx - p.replication_idx_, my_committed_idx, p.replication_idx_);
}
}
return res;
}

uint32_t RaftReplDev::get_blk_size() const { return data_service().get_blk_size(); }

nuraft_mesg::repl_service_ctx* RaftReplDev::group_msg_service() { return m_repl_svc_ctx.get(); }
Expand Down
1 change: 1 addition & 0 deletions src/lib/replication/repl_dev/raft_repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ class RaftReplDev : public ReplDev,
bool is_leader() const override;
replica_id_t get_leader_id() const override;
std::vector< peer_info > get_replication_status() const override;
std::set< replica_id_t > get_active_peers() const;
group_id_t group_id() const override { return m_group_id; }
std::string group_id_str() const { return boost::uuids::to_string(m_group_id); }
std::string rdev_name() const { return m_rdev_name; }
Expand Down

0 comments on commit 07082de

Please sign in to comment.