Skip to content

Commit

Permalink
Add get_leader_id/get_replication_status into replicationService
Browse files Browse the repository at this point in the history
Signed-off-by: Xiaoxi Chen <[email protected]>
  • Loading branch information
xiaoxichen committed Jan 30, 2024
1 parent 198657e commit 5a480f0
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 1 deletion.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "5.0.7"
version = "5.1.1"
homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
topics = ("ebay", "nublox")
Expand Down
17 changes: 17 additions & 0 deletions src/include/homestore/replication_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,15 @@ VENUM(repl_impl_type, uint8_t,
solo // For single node - no replication
);

struct peer_info {
// Peer ID.
replica_id_t id_;
// The last replication index that the peer has, from this server's point of view.
uint64_t replication_idx_;
// The elapsed time since the last successful response from this peer, set to 0 on leader
uint64_t last_succ_resp_us_;
};

class ReplApplication;

class ReplicationService {
Expand Down Expand Up @@ -52,6 +61,14 @@ class ReplicationService {
virtual hs_stats get_cap_stats() const = 0;

virtual meta_sub_type get_meta_blk_name() const = 0;

/// @brief get the leader replica_id of given group
virtual const replica_id_t get_leader_id(group_id_t group_id) const = 0;

/// @brief get replication status. If called on follower member
/// this API can return empty result.
virtual std::vector<peer_info> get_replication_status(group_id_t group_id) const = 0;

};

//////////////// Application which uses Replication needs to be provide the following callbacks ////////////////
Expand Down
8 changes: 8 additions & 0 deletions src/lib/replication/service/generic_repl_svc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,14 @@ AsyncReplResult<> SoloReplService::replace_member(group_id_t group_id, replica_i
return make_async_error<>(ReplServiceError::NOT_IMPLEMENTED);
}

const replica_id_t SoloReplService::get_leader_id(group_id_t group_id) const {
return m_my_uuid;
}

std::vector<peer_info> SoloReplService::get_replication_status(group_id_t group_id) const {
return std::vector<peer_info>{peer_info {.id_ = m_my_uuid, .replication_idx_ = 0, .last_succ_resp_us_ = 0}};
}

std::unique_ptr< CPContext > SoloReplServiceCPHandler::on_switchover_cp(CP* cur_cp, CP* new_cp) { return nullptr; }

folly::Future< bool > SoloReplServiceCPHandler::cp_flush(CP* cp) {
Expand Down
2 changes: 2 additions & 0 deletions src/lib/replication/service/generic_repl_svc.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ class SoloReplService : public GenericReplService {
void load_repl_dev(sisl::byte_view const& buf, void* meta_cookie) override;
AsyncReplResult<> replace_member(group_id_t group_id, replica_id_t member_out,
replica_id_t member_in) const override;
const replica_id_t get_leader_id(group_id_t group_id) const override;
std::vector<peer_info> get_replication_status(group_id_t group_id) const override;
};

class SoloReplServiceCPHandler : public CPCallbacks {
Expand Down
27 changes: 27 additions & 0 deletions src/lib/replication/service/raft_repl_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,33 @@ AsyncReplResult<> RaftReplService::replace_member(group_id_t group_id, replica_i
return make_async_error<>(ReplServiceError::NOT_IMPLEMENTED);
}

const replica_id_t RaftReplService::get_leader_id(group_id_t group_id) const {
auto repl_dev = get_repl_dev(group_id);
if (!repl_dev) { return replica_id_t(); }
auto repl_svc_ctx = dynamic_pointer_cast< RaftReplDev >(repl_dev.value())->group_msg_service();
if (!repl_svc_ctx) { return replica_id_t{}; }

auto leader = repl_svc_ctx->raft_leader_id();
return boost::lexical_cast< replica_id_t >(leader);
}

std::vector< peer_info > RaftReplService::get_replication_status(group_id_t group_id) const {
std::vector< peer_info > pi;
auto repl_dev = get_repl_dev(group_id);
if (!repl_dev) { return pi; }

auto repl_svc_ctx = dynamic_pointer_cast< RaftReplDev >(repl_dev.value())->group_msg_service();
if (!repl_svc_ctx) { return pi; }

auto rep_status = repl_svc_ctx->get_raft_status();
for (auto const& pinfo : rep_status) {
pi.emplace_back(peer_info{.id_ = boost::lexical_cast< replica_id_t >(pinfo.id_),
.replication_idx_ = pinfo.last_log_idx_,
.last_succ_resp_us_ = pinfo.last_succ_resp_us_});
}
return pi;
}

///////////////////// RaftReplService CP Callbacks /////////////////////////////
std::unique_ptr< CPContext > RaftReplServiceCPHandler::on_switchover_cp(CP* cur_cp, CP* new_cp) { return nullptr; }

Expand Down
2 changes: 2 additions & 0 deletions src/lib/replication/service/raft_repl_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class RaftReplService : public GenericReplService,
std::shared_ptr< nuraft_mesg::mesg_state_mgr > create_state_mgr(int32_t srv_id,
nuraft_mesg::group_id_t const& group_id) override;
nuraft_mesg::Manager& msg_manager() { return *m_msg_mgr; }
const replica_id_t get_leader_id(group_id_t group_id) const override;
std::vector<peer_info> get_replication_status(group_id_t group_id) const override;

protected:
///////////////////// Overrides of GenericReplService ////////////////////
Expand Down
33 changes: 33 additions & 0 deletions src/tests/test_raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,39 @@ TEST_F(RaftReplDevTest, All_Append_Restart_Append) {
g_helper->sync_for_cleanup_start();
}


TEST_F(RaftReplDevTest, All_ReplService) {
LOGINFO("Homestore replica={} setup completed", g_helper->replica_num());
g_helper->sync_for_test_start();
auto group_id = pick_one_db().repl_dev()->group_id();
auto my_id_str = dynamic_cast< RaftReplDev* >(pick_one_db().repl_dev())->my_replica_id_str();

auto leader = hs()->repl_service().get_leader_id(group_id);
ASSERT_TRUE(leader != replica_id_t())
<< "Error getting leader id for group_id=" << boost::uuids::to_string(group_id).c_str();
auto leader_str = boost::uuids::to_string(leader);
LOGINFO("Got raft leader {} for group {}", leader_str, group_id);

if (g_helper->replica_num() == 0) {
ASSERT_TRUE(leader_str == my_id_str)
<< "Leader id " << leader_str.c_str() << " should equals to my ID " << my_id_str.c_str();
} else {
ASSERT_TRUE(leader_str != my_id_str) << "I am a follower, Leader id " << leader_str.c_str()
<< " should not equals to my ID " << my_id_str.c_str();
}

auto peers_info = hs()->repl_service().get_replication_status(group_id);
LOGINFO("Got peers_info size {} for group {}", peers_info.size(), group_id);
if (g_helper->replica_num() == 0) {
auto const num_replicas = SISL_OPTIONS["replicas"].as< uint32_t >();
EXPECT_TRUE(peers_info.size() == num_replicas)
<< "Expecting peers_info size " << peers_info.size() << " but got " << peers_info.size();
} else {
EXPECT_TRUE(peers_info.size() == 0) << "Expecting zero length on follower, got " << peers_info.size();
}
g_helper->sync_for_cleanup_start();
}

int main(int argc, char* argv[]) {
int parsed_argc{argc};
char** orig_argv = argv;
Expand Down

0 comments on commit 5a480f0

Please sign in to comment.