Skip to content

Commit

Permalink
Add get_leader() for using in AM rest API (#80)
Browse files Browse the repository at this point in the history
  • Loading branch information
shosseinimotlagh authored Apr 19, 2024
1 parent a8d0a1e commit bc883a4
Show file tree
Hide file tree
Showing 10 changed files with 65 additions and 20 deletions.
1 change: 0 additions & 1 deletion .clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ AlignOperands: false
AlignTrailingComments: true
AllowShortBlocksOnASingleLine: true
AllowShortIfStatementsOnASingleLine: true
AllowShortBlocksOnASingleLine: true
AllowShortCaseLabelsOnASingleLine: false
# AllowShortFunctionsOnASingleLine: InlineOnly
# AllowShortLoopsOnASingleLine: false
Expand Down
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

class NuRaftGrpcConan(ConanFile):
name = "nuraft_grpc"
version = "6.0.5"
version = "6.0.6"
homepage = "https://github.com/eBay/nuraft_mesg"
description = "A gRPC service for NuRAFT"
topics = ("ebay", "nublox", "raft")
Expand Down
1 change: 1 addition & 0 deletions src/include/grpc_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class grpc_server : public nuraft::raft_server_handler {
void yield_leadership(bool immediate = false);

void get_srv_config_all(std::vector< nuraft::ptr< nuraft::srv_config > >& configs_out);
int get_leader();

nuraft::ptr< nuraft::cmd_result< nuraft::ptr< nuraft::buffer > > >
append_entries(const std::vector< nuraft::ptr< nuraft::buffer > >& logs);
Expand Down
2 changes: 1 addition & 1 deletion src/include/messaging.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#include <system_error>

#include <sisl/logging/logging.h>

#include "messaging_if.hpp"

namespace sisl {
Expand Down Expand Up @@ -84,6 +83,7 @@ class service : public consensus_component {
uint32_t logstore_id(std::string const& group_id) const override;
void get_peers(std::string const& group_id, std::list< std::string >&) const override;
void restart_server() override;
server_info_t get_leader(std::string const& group_name) override;

// data service APIs
bool bind_data_service_request(std::string const& request_name, std::string const& group_id,
Expand Down
2 changes: 2 additions & 0 deletions src/include/messaging_if.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ using generic_unary_callback_t = std::function< void(grpc::ByteBuffer&, ::grpc::
namespace nuraft_mesg {

using io_blob_list_t = folly::small_vector< sisl::io_blob, 4 >;
using server_info_t = std::pair< int, std::string >;

// called by the server after it receives the request
using data_service_request_handler_t = std::function< void(sisl::io_blob const& incoming_buf, void* rpc_data) >;
Expand Down Expand Up @@ -130,6 +131,7 @@ class consensus_component {
virtual uint32_t logstore_id(std::string const& group_id) const = 0;
virtual int32_t server_id() const = 0;
virtual void restart_server() = 0;
virtual server_info_t get_leader(std::string const& group_name) = 0;

// data channel APIs
virtual bool bind_data_service_request(std::string const& request_name, std::string const& group_id,
Expand Down
5 changes: 3 additions & 2 deletions src/lib/grpc_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ static shared< nuraft::req_msg > toRequest(RaftMessage const& raft_msg) {
for (auto const& log : req.log_entries()) {
auto log_buffer = nuraft::buffer::alloc(log.buffer().size());
memcpy(log_buffer->data(), log.buffer().data(), log.buffer().size());
log_entries.push_back(
std::make_shared< nuraft::log_entry >(log.term(), log_buffer, (nuraft::log_val_type)log.type(), log.timestamp()));
log_entries.push_back(std::make_shared< nuraft::log_entry >(log.term(), log_buffer,
(nuraft::log_val_type)log.type(), log.timestamp()));
}
return message;
}
Expand All @@ -60,6 +60,7 @@ bool grpc_server::request_leadership() { return _raft_server->request_leadership
void grpc_server::get_srv_config_all(std::vector< nuraft::ptr< nuraft::srv_config > >& configs_out) {
_raft_server->get_srv_config_all(configs_out);
}
int grpc_server::get_leader() { return _raft_server->get_leader(); }

nuraft::ptr< nuraft::cmd_result< nuraft::ptr< nuraft::buffer > > > grpc_server::rem_srv(int const member_id) {
return _raft_server->remove_srv(member_id);
Expand Down
6 changes: 4 additions & 2 deletions src/lib/messaging.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ nuraft::cb_func::ReturnCode service::callback_handler(std::string const& group_i
case nuraft::cb_func::JoinedCluster: {
auto const my_id = param->myId;
auto const leader_id = param->leaderId;
LOGINFO("Joined cluster: {}, [l_id:{},my_id:{}]", group_id, leader_id, my_id);
LOGINFO("Joined cluster: {}, [leader_id:{}, my_id:{}]", group_id, leader_id, my_id);
{
std::lock_guard< std::mutex > lg(_manager_lock);
_is_leader[group_id] = (leader_id == my_id);
Expand All @@ -161,7 +161,7 @@ nuraft::cb_func::ReturnCode service::callback_handler(std::string const& group_i
_config_change.notify_all();
} break;
case nuraft::cb_func::BecomeFollower: {
LOGDEBUGMOD(nuraft_mesg, "I'm a follower of: {}!", group_id);
LOGDEBUGMOD(nuraft_mesg, "I'm a follower of: {} with the leader {}!", group_id, param->leaderId);
{
std::lock_guard< std::mutex > lg(_manager_lock);
_is_leader[group_id] = false;
Expand Down Expand Up @@ -430,6 +430,8 @@ void service::get_srv_config_all(std::string const& group_name,
_mesg_service->get_srv_config_all(group_name, configs_out);
}

server_info_t service::get_leader(std::string const& group_name) { return _mesg_service->get_leader(group_name); }

bool service::bind_data_service_request(std::string const& request_name, std::string const& group_id,
data_service_request_handler_t const& request_handler) {
return _mesg_service->bind_data_service_request(request_name, group_id, request_handler);
Expand Down
55 changes: 47 additions & 8 deletions src/lib/service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ nuraft::cmd_result_code msg_service::add_srv(group_name_t const& group_name, nur
if (server) {
try {
return server->add_srv(cfg)->get_result_code();
} catch (std::runtime_error& rte) { LOGERRORMOD(nuraft_mesg, "Caught exception during add_srv(): {}", rte.what()); }
} catch (std::runtime_error& rte) {
LOGERRORMOD(nuraft_mesg, "Caught exception during add_srv(): {}", rte.what());
}
}
return nuraft::SERVER_NOT_FOUND;
}
Expand All @@ -103,7 +105,9 @@ nuraft::cmd_result_code msg_service::rm_srv(group_name_t const& group_name, int
if (server) {
try {
return server->rem_srv(member_id)->get_result_code();
} catch (std::runtime_error& rte) { LOGERRORMOD(nuraft_mesg, "Caught exception during rm_srv(): {}", rte.what()); }
} catch (std::runtime_error& rte) {
LOGERRORMOD(nuraft_mesg, "Caught exception during rm_srv(): {}", rte.what());
}
}
return nuraft::SERVER_NOT_FOUND;
}
Expand Down Expand Up @@ -136,8 +140,38 @@ void msg_service::get_srv_config_all(group_name_t const& group_name,
try {
server->get_srv_config_all(configs_out);
return;
} catch (std::runtime_error& rte) { LOGERRORMOD(nuraft_mesg, "Caught exception during add_srv(): {}", rte.what()); }
} catch (std::runtime_error& rte) {
LOGERRORMOD(nuraft_mesg, "Caught exception during add_srv(): {}", rte.what());
}
}
}

server_info_t msg_service::get_leader(group_name_t const& group_name) {

shared< grpc_server > server;
{
std::shared_lock< lock_type > rl(_raft_servers_lock);
if (auto it = _raft_servers.find(group_name); _raft_servers.end() != it) { server = it->second.m_server; }
}
if (server) {
try {
auto leader_id = server->get_leader();
std::vector< std::shared_ptr< nuraft::srv_config > > configs;
server->get_srv_config_all(configs);
std::string leader_endpoint = "";
auto it = std::find_if(configs.begin(), configs.end(),
[&leader_id](const auto& cfg) { return cfg->get_id() == leader_id; });

if (it != configs.end()) {
return std::make_pair(leader_id, (*it)->get_endpoint());
} else {
LOGERRORMOD(nuraft_mesg, "Could not find leader in the server config");
}
} catch (std::runtime_error& rte) {
LOGERRORMOD(nuraft_mesg, "Caught exception during get_leader(): {}", rte.what());
}
}
return std::make_pair(-1, "");
}

nuraft::cmd_result_code msg_service::append_entries(group_name_t const& group_name,
Expand All @@ -150,7 +184,9 @@ nuraft::cmd_result_code msg_service::append_entries(group_name_t const& group_na
if (server) {
try {
return server->append_entries(logs)->get_result_code();
} catch (std::runtime_error& rte) { LOGERRORMOD(nuraft_mesg, "Caught exception during step(): {}", rte.what()); }
} catch (std::runtime_error& rte) {
LOGERRORMOD(nuraft_mesg, "Caught exception during step(): {}", rte.what());
}
}
return nuraft::SERVER_NOT_FOUND;
}
Expand Down Expand Up @@ -212,7 +248,9 @@ bool msg_service::raftStep(const sisl::AsyncRpcDataPtr< Messaging, RaftGroupMsg,
try {
rpc_data->set_status(server->step(request.msg(), *response.mutable_msg()));
return true;
} catch (std::runtime_error& rte) { LOGERRORMOD(nuraft_mesg, "Caught exception during step(): {}", rte.what()); }
} catch (std::runtime_error& rte) {
LOGERRORMOD(nuraft_mesg, "Caught exception during step(): {}", rte.what());
}
} else {
LOGDEBUGMOD(nuraft_mesg, "Missing RAFT group: {}", group_name);
}
Expand All @@ -226,8 +264,8 @@ bool msg_service::raftStep(const sisl::AsyncRpcDataPtr< Messaging, RaftGroupMsg,
class null_service final : public grpc_server {
public:
using grpc_server::grpc_server;
void associate(sisl::GrpcServer*) override{};
void bind(sisl::GrpcServer*) override{};
void associate(sisl::GrpcServer*) override {};
void bind(sisl::GrpcServer*) override {};
};

class msg_group_listner : public nuraft::rpc_listener {
Expand Down Expand Up @@ -274,7 +312,8 @@ std::error_condition msg_service::joinRaftGroup(int32_t const srv_id, group_name
std::tie(it, happened) = _raft_servers.emplace(std::make_pair(group_name, group_name));
if (_raft_servers.end() != it && happened) {
if (auto err = _get_server_ctx(srv_id, group_name, g_type, ctx, it->second.m_metrics); err) {
LOGERRORMOD(nuraft_mesg, "Error during RAFT server creation on group {}: {}", group_name, err.message());
LOGERRORMOD(nuraft_mesg, "Error during RAFT server creation on group {}: {}", group_name,
err.message());
return err;
}
DEBUG_ASSERT(!ctx->rpc_listener_, "RPC listner should not be set!");
Expand Down
9 changes: 4 additions & 5 deletions src/lib/service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ using group_type_t = std::string;
class msg_service;
class mesg_factory;
class repl_service_ctx_grpc;

using lock_type = folly::SharedMutex;

class group_metrics : public sisl::MetricsGroupWrapper {
Expand All @@ -45,9 +44,9 @@ class group_metrics : public sisl::MetricsGroupWrapper {
~group_metrics() { deregister_me_from_farm(); }
};

using get_server_ctx_cb = std::function< std::error_condition(int32_t srv_id, group_name_t const&, group_type_t const&,
nuraft::context*& ctx_out,
shared< group_metrics > metrics) >;
using get_server_ctx_cb =
std::function< std::error_condition(int32_t srv_id, group_name_t const&, group_type_t const&,
nuraft::context*& ctx_out, shared< group_metrics > metrics) >;

// pluggable type for data service
using data_service_t = data_service_grpc;
Expand Down Expand Up @@ -91,7 +90,7 @@ class msg_service : public std::enable_shared_from_this< msg_service > {
nuraft::cmd_result_code rm_srv(group_name_t const& group_name, int const member_id);
bool request_leadership(group_name_t const& group_name);
void get_srv_config_all(group_name_t const& group_name, std::vector< shared< nuraft::srv_config > >& configs_out);

server_info_t get_leader(group_name_t const& group_name);
nuraft::cmd_result_code append_entries(group_name_t const& group_name,
std::vector< nuraft::ptr< nuraft::buffer > > const& logs);

Expand Down
2 changes: 2 additions & 0 deletions src/tests/MessagingTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,8 @@ TEST_F(MessagingFixture, SyncAddMember) {
srv_list.clear();
instance_1->get_srv_config_all("test_group", srv_list);
EXPECT_EQ(srv_list.size(), 4u);
LOGINFO("Leader of server1: id: {} endpoint {}", instance_1->get_leader("test_group").first,
instance_1->get_leader("test_group").second);
}

class DataServiceFixture : public MessagingFixtureBase {
Expand Down

0 comments on commit bc883a4

Please sign in to comment.