Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PushData only pushed to active followers. #584

Merged
merged 2 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.5.9"
version = "6.5.10"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
4 changes: 4 additions & 0 deletions src/lib/common/homestore_config.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,10 @@ table Consensus {

// Log difference to determine if the follower is in resync mode
resync_log_idx_threshold: int64 = 100;

// Log difference from leader's point of view, to determine if the
// follower is laggy and if so, leader will stop pushing data until it drops under this threshold.
laggy_threshold: int64 = 2000;
}

table HomeStoreSettings {
Expand Down
58 changes: 42 additions & 16 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -363,23 +363,30 @@ void RaftReplDev::push_data_to_all_followers(repl_req_ptr_t rreq, sisl::sg_list
flatbuffers::FlatBufferToString(builder.GetBufferPointer() + sizeof(flatbuffers::uoffset_t),
PushDataRequestTypeTable()));*/

RD_LOGD("Data Channel: Pushing data to all followers: rreq=[{}]", rreq->to_string());

group_msg_service()
->data_service_request_unidirectional(nuraft_mesg::role_regex::ALL, PUSH_DATA, rreq->m_pkts)
.via(&folly::InlineExecutor::instance())
.thenValue([this, rreq = std::move(rreq)](auto e) {
if (e.hasError()) {
RD_LOGE("Data Channel: Error in pushing data to all followers: rreq=[{}] error={}", rreq->to_string(),
e.error());
handle_error(rreq, RaftReplService::to_repl_error(e.error()));
return;
auto peers = get_active_peers();
auto calls = std::vector< nuraft_mesg::NullAsyncResult >();
for (auto peer : peers) {
RD_LOGD("Data Channel: Pushing data to follower {}, rreq=[{}]", peer, rreq->to_string());
calls.push_back(group_msg_service()
->data_service_request_unidirectional(peer, PUSH_DATA, rreq->m_pkts)
.via(&folly::InlineExecutor::instance()));
}
folly::collectAllUnsafe(calls).thenValue([this, rreq](auto&& v_res) {
xiaoxichen marked this conversation as resolved.
Show resolved Hide resolved
for (auto const& res : v_res) {
if (sisl_likely(res.value())) {
auto r = res.value();
if (r.hasError()) {
// Just logging PushData error, no action is needed as follower can try by fetchData.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this part makes sense

RD_LOGW("Data Channel: Error in pushing data to all followers: rreq=[{}] error={}",
rreq->to_string(), r.error());
}
}
// Release the buffer which holds the packets
RD_LOGD("Data Channel: Data push completed for rreq=[{}]", rreq->to_string());
rreq->release_fb_builder();
rreq->m_pkts.clear();
});
}
RD_LOGD("Data Channel: Data push completed for rreq=[{}]", rreq->to_string());
// Release the buffer which holds the packets
rreq->release_fb_builder();
rreq->m_pkts.clear();
});
}

void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_data) {
Expand Down Expand Up @@ -1039,6 +1046,25 @@ std::vector< peer_info > RaftReplDev::get_replication_status() const {
return pi;
}

std::set< replica_id_t > RaftReplDev::get_active_peers() const {
auto repl_status = get_replication_status();
std::set< replica_id_t > res;
auto my_committed_idx = m_commit_upto_lsn.load();
uint64_t least_active_repl_idx = my_committed_idx > HS_DYNAMIC_CONFIG(consensus.laggy_threshold)
? my_committed_idx - HS_DYNAMIC_CONFIG(consensus.laggy_threshold)
: 0;
for (auto p : repl_status) {
if (p.id_ == m_my_repl_id) { continue; }
if (p.replication_idx_ >= least_active_repl_idx) {
res.insert(p.id_);
} else {
RD_LOGW("Excluding peer {} from active_peers, lag {}, my lsn {}, peer lsn {}", p.id_,
my_committed_idx - p.replication_idx_, my_committed_idx, p.replication_idx_);
}
}
return res;
}

uint32_t RaftReplDev::get_blk_size() const { return data_service().get_blk_size(); }

nuraft_mesg::repl_service_ctx* RaftReplDev::group_msg_service() { return m_repl_svc_ctx.get(); }
Expand Down
1 change: 1 addition & 0 deletions src/lib/replication/repl_dev/raft_repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ class RaftReplDev : public ReplDev,
bool is_leader() const override;
replica_id_t get_leader_id() const override;
std::vector< peer_info > get_replication_status() const override;
std::set< replica_id_t > get_active_peers() const;
group_id_t group_id() const override { return m_group_id; }
std::string group_id_str() const { return boost::uuids::to_string(m_group_id); }
std::string rdev_name() const { return m_rdev_name; }
Expand Down
Loading