From 7ff78d3cb516bb733a508801817c1eac44f5c485 Mon Sep 17 00:00:00 2001 From: Xiaoxi Chen Date: Thu, 7 Nov 2024 16:56:38 +0800 Subject: [PATCH] PushData only pushed to active followers. If a follower is lagging too far, do not flood it with data from new IOs (new rreq, new LSNs) , reserve the capability for catching up, that follower can request data via FetchData. Signed-off-by: Xiaoxi Chen --- src/lib/common/homestore_config.fbs | 3 + .../replication/repl_dev/raft_repl_dev.cpp | 58 ++++++++++++++----- src/lib/replication/repl_dev/raft_repl_dev.h | 1 + 3 files changed, 46 insertions(+), 16 deletions(-) diff --git a/src/lib/common/homestore_config.fbs b/src/lib/common/homestore_config.fbs index 24aae9f20..003a91606 100644 --- a/src/lib/common/homestore_config.fbs +++ b/src/lib/common/homestore_config.fbs @@ -264,6 +264,9 @@ table Consensus { // Log difference to determine if the follower is in resync mode resync_log_idx_threshold: int64 = 100; + + // Log difference from leader's point of view, to determine if the follower is laggy. + laggy_threshold: int64 = 2000; } table HomeStoreSettings { diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index 59916d039..5f7ab15bb 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -363,23 +363,30 @@ void RaftReplDev::push_data_to_all_followers(repl_req_ptr_t rreq, sisl::sg_list flatbuffers::FlatBufferToString(builder.GetBufferPointer() + sizeof(flatbuffers::uoffset_t), PushDataRequestTypeTable()));*/ - RD_LOGD("Data Channel: Pushing data to all followers: rreq=[{}]", rreq->to_string()); - - group_msg_service() - ->data_service_request_unidirectional(nuraft_mesg::role_regex::ALL, PUSH_DATA, rreq->m_pkts) - .via(&folly::InlineExecutor::instance()) - .thenValue([this, rreq = std::move(rreq)](auto e) { - if (e.hasError()) { - RD_LOGE("Data Channel: Error in pushing data to all followers: rreq=[{}] error={}", rreq->to_string(), - e.error()); - handle_error(rreq, RaftReplService::to_repl_error(e.error())); - return; + auto peers = get_active_peers(); + auto calls = std::vector< nuraft_mesg::NullAsyncResult >(); + for (auto peer : peers) { + RD_LOGD("Data Channel: Pushing data to follower {}, rreq=[{}]", peer, rreq->to_string()); + calls.push_back(group_msg_service() + ->data_service_request_unidirectional(peer, PUSH_DATA, rreq->m_pkts) + .via(&folly::InlineExecutor::instance())); + } + folly::collectAllUnsafe(calls).thenValue([this, rreq](auto&& v_res) { + for (auto const& res : v_res) { + if (sisl_likely(res.value())) { + auto r = res.value(); + if (r.hasError()) { + // Just logging PushData error, no action is needed as follower can try by fetchData. + RD_LOGW("Data Channel: Error in pushing data to all followers: rreq=[{}] error={}", + rreq->to_string(), r.error()); + } } - // Release the buffer which holds the packets - RD_LOGD("Data Channel: Data push completed for rreq=[{}]", rreq->to_string()); - rreq->release_fb_builder(); - rreq->m_pkts.clear(); - }); + } + RD_LOGD("Data Channel: Data push completed for rreq=[{}]", rreq->to_string()); + // Release the buffer which holds the packets + rreq->release_fb_builder(); + rreq->m_pkts.clear(); + }); } void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_data) { @@ -1020,6 +1027,25 @@ std::vector< peer_info > RaftReplDev::get_replication_status() const { return pi; } +std::set< replica_id_t > RaftReplDev::get_active_peers() const { + auto repl_status = get_replication_status(); + std::set< replica_id_t > res; + auto my_committed_idx = m_commit_upto_lsn.load(); + uint64_t lagThreshold = my_committed_idx > HS_DYNAMIC_CONFIG(consensus.laggy_threshold) + ? my_committed_idx - HS_DYNAMIC_CONFIG(consensus.laggy_threshold) + : 0; + for (auto p : repl_status) { + if (p.id_ == m_my_repl_id) { continue; } + if (p.replication_idx_ >= lagThreshold) { + res.insert(p.id_); + } else { + RD_LOGW("Excluding peer {} from active_peers, lag {}, my lsn {}, peer lsn {}", p.id_, + my_committed_idx - p.replication_idx_, my_committed_idx, p.replication_idx_); + } + } + return res; +} + uint32_t RaftReplDev::get_blk_size() const { return data_service().get_blk_size(); } nuraft_mesg::repl_service_ctx* RaftReplDev::group_msg_service() { return m_repl_svc_ctx.get(); } diff --git a/src/lib/replication/repl_dev/raft_repl_dev.h b/src/lib/replication/repl_dev/raft_repl_dev.h index 4be98394c..b426c82d9 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.h +++ b/src/lib/replication/repl_dev/raft_repl_dev.h @@ -176,6 +176,7 @@ class RaftReplDev : public ReplDev, bool is_leader() const override; replica_id_t get_leader_id() const override; std::vector< peer_info > get_replication_status() const override; + std::set< replica_id_t > get_active_peers() const; group_id_t group_id() const override { return m_group_id; } std::string group_id_str() const { return boost::uuids::to_string(m_group_id); } std::string rdev_name() const { return m_rdev_name; }