From 18e9b4fe64eb79e0d1e4efcb569f904541f1f14c Mon Sep 17 00:00:00 2001 From: "Ravi Akella email = raakella@ebay.com" Date: Mon, 11 Sep 2023 10:43:02 -0700 Subject: [PATCH 1/5] update sisl --- conanfile.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/conanfile.py b/conanfile.py index 92b0230..df7dde9 100644 --- a/conanfile.py +++ b/conanfile.py @@ -3,7 +3,7 @@ class NuRaftGrpcConan(ConanFile): name = "nuraft_grpc" - version = "6.0.2" + version = "6.0.3" homepage = "https://github.com/eBay/nuraft_mesg" description = "A gRPC service for NuRAFT" topics = ("ebay", "nublox", "raft") @@ -50,7 +50,7 @@ def requirements(self): self.requires("boost/1.79.0") self.requires("nuraft/nbi.2.2.0") self.requires("openssl/1.1.1s") - self.requires("sisl/8.6.4") + self.requires("sisl/8.6.5") self.requires("lz4/1.9.4", override=True) From 6c6128216a6262e865c35a7c603c108d4131954c Mon Sep 17 00:00:00 2001 From: Ravi Nagarjun Akella Date: Wed, 6 Dec 2023 14:20:46 -0700 Subject: [PATCH 2/5] do not crash if grpc server creation fails --- conanfile.py | 2 +- src/lib/messaging.cpp | 16 ++++++++++++++-- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/conanfile.py b/conanfile.py index df7dde9..fdca4a3 100644 --- a/conanfile.py +++ b/conanfile.py @@ -3,7 +3,7 @@ class NuRaftGrpcConan(ConanFile): name = "nuraft_grpc" - version = "6.0.3" + version = "6.0.4" homepage = "https://github.com/eBay/nuraft_mesg" description = "A gRPC service for NuRAFT" topics = ("ebay", "nublox", "raft") diff --git a/src/lib/messaging.cpp b/src/lib/messaging.cpp index 61c5666..2064fb2 100644 --- a/src/lib/messaging.cpp +++ b/src/lib/messaging.cpp @@ -103,9 +103,21 @@ void service::restart_server() { LOGINFO("Starting Messaging Service on http://{}", listen_address); std::lock_guard< std::mutex > lg(_manager_lock); + sisl::GrpcServer* tmp_server = nullptr; + try { + tmp_server = sisl::GrpcServer::make(listen_address, _start_params.auth_mgr, grpc_server_threads, + _start_params.ssl_key, _start_params.ssl_cert); + } catch (std::runtime_error const& e) { + LOGERROR("Failed to create GRPC server for Messaging Service: {}", e.what()); + return; + } + if (!tmp_server) { + LOGERROR("Failed to create GRPC server: for Messaging Service"); + return; + } + _grpc_server.reset(); - _grpc_server = std::unique_ptr< sisl::GrpcServer >(sisl::GrpcServer::make( - listen_address, _start_params.auth_mgr, grpc_server_threads, _start_params.ssl_key, _start_params.ssl_cert)); + _grpc_server = std::unique_ptr< sisl::GrpcServer >(tmp_server); _mesg_service->associate(_grpc_server.get()); _grpc_server->run(); From 314264b5209d5309b4390fde79ebfbe41d148cb7 Mon Sep 17 00:00:00 2001 From: Ravi Nagarjun Akella Date: Wed, 6 Dec 2023 15:25:48 -0700 Subject: [PATCH 3/5] upgrade grpc --- conanfile.py | 1 + 1 file changed, 1 insertion(+) diff --git a/conanfile.py b/conanfile.py index fdca4a3..3a2717f 100644 --- a/conanfile.py +++ b/conanfile.py @@ -53,6 +53,7 @@ def requirements(self): self.requires("sisl/8.6.5") self.requires("lz4/1.9.4", override=True) + self.requires("grpc/1.50.1", override=True) def build(self): cmake = CMake(self) From 3bd4f30b1ac0af754b067503bb54be597fbe9193 Mon Sep 17 00:00:00 2001 From: shosseinimotlagh Date: Wed, 7 Feb 2024 10:17:09 -0800 Subject: [PATCH 4/5] bump up version for metrics shrinkage --- conanfile.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/conanfile.py b/conanfile.py index 3a2717f..e191327 100644 --- a/conanfile.py +++ b/conanfile.py @@ -3,7 +3,7 @@ class NuRaftGrpcConan(ConanFile): name = "nuraft_grpc" - version = "6.0.4" + version = "6.0.5" homepage = "https://github.com/eBay/nuraft_mesg" description = "A gRPC service for NuRAFT" topics = ("ebay", "nublox", "raft") @@ -50,7 +50,7 @@ def requirements(self): self.requires("boost/1.79.0") self.requires("nuraft/nbi.2.2.0") self.requires("openssl/1.1.1s") - self.requires("sisl/8.6.5") + self.requires("sisl/8.6.8") self.requires("lz4/1.9.4", override=True) self.requires("grpc/1.50.1", override=True) From bc883a44957ccc2dc1da964811c7105f7dd5d137 Mon Sep 17 00:00:00 2001 From: Mehdi Hosseini <116847813+shosseinimotlagh@users.noreply.github.com> Date: Fri, 19 Apr 2024 11:31:15 -0700 Subject: [PATCH 5/5] Add get_leader() for using in AM rest API (#80) --- .clang-format | 1 - conanfile.py | 2 +- src/include/grpc_server.hpp | 1 + src/include/messaging.hpp | 2 +- src/include/messaging_if.hpp | 2 ++ src/lib/grpc_server.cpp | 5 ++-- src/lib/messaging.cpp | 6 ++-- src/lib/service.cpp | 55 ++++++++++++++++++++++++++++++------ src/lib/service.hpp | 9 +++--- src/tests/MessagingTest.cpp | 2 ++ 10 files changed, 65 insertions(+), 20 deletions(-) diff --git a/.clang-format b/.clang-format index 2f77120..fdfa11f 100644 --- a/.clang-format +++ b/.clang-format @@ -18,7 +18,6 @@ AlignOperands: false AlignTrailingComments: true AllowShortBlocksOnASingleLine: true AllowShortIfStatementsOnASingleLine: true -AllowShortBlocksOnASingleLine: true AllowShortCaseLabelsOnASingleLine: false # AllowShortFunctionsOnASingleLine: InlineOnly # AllowShortLoopsOnASingleLine: false diff --git a/conanfile.py b/conanfile.py index e191327..9448c6a 100644 --- a/conanfile.py +++ b/conanfile.py @@ -3,7 +3,7 @@ class NuRaftGrpcConan(ConanFile): name = "nuraft_grpc" - version = "6.0.5" + version = "6.0.6" homepage = "https://github.com/eBay/nuraft_mesg" description = "A gRPC service for NuRAFT" topics = ("ebay", "nublox", "raft") diff --git a/src/include/grpc_server.hpp b/src/include/grpc_server.hpp index 99ad099..d223be6 100644 --- a/src/include/grpc_server.hpp +++ b/src/include/grpc_server.hpp @@ -48,6 +48,7 @@ class grpc_server : public nuraft::raft_server_handler { void yield_leadership(bool immediate = false); void get_srv_config_all(std::vector< nuraft::ptr< nuraft::srv_config > >& configs_out); + int get_leader(); nuraft::ptr< nuraft::cmd_result< nuraft::ptr< nuraft::buffer > > > append_entries(const std::vector< nuraft::ptr< nuraft::buffer > >& logs); diff --git a/src/include/messaging.hpp b/src/include/messaging.hpp index 9dedb15..278ec5b 100644 --- a/src/include/messaging.hpp +++ b/src/include/messaging.hpp @@ -23,7 +23,6 @@ #include #include - #include "messaging_if.hpp" namespace sisl { @@ -84,6 +83,7 @@ class service : public consensus_component { uint32_t logstore_id(std::string const& group_id) const override; void get_peers(std::string const& group_id, std::list< std::string >&) const override; void restart_server() override; + server_info_t get_leader(std::string const& group_name) override; // data service APIs bool bind_data_service_request(std::string const& request_name, std::string const& group_id, diff --git a/src/include/messaging_if.hpp b/src/include/messaging_if.hpp index ea914e5..b91f317 100644 --- a/src/include/messaging_if.hpp +++ b/src/include/messaging_if.hpp @@ -38,6 +38,7 @@ using generic_unary_callback_t = std::function< void(grpc::ByteBuffer&, ::grpc:: namespace nuraft_mesg { using io_blob_list_t = folly::small_vector< sisl::io_blob, 4 >; +using server_info_t = std::pair< int, std::string >; // called by the server after it receives the request using data_service_request_handler_t = std::function< void(sisl::io_blob const& incoming_buf, void* rpc_data) >; @@ -130,6 +131,7 @@ class consensus_component { virtual uint32_t logstore_id(std::string const& group_id) const = 0; virtual int32_t server_id() const = 0; virtual void restart_server() = 0; + virtual server_info_t get_leader(std::string const& group_name) = 0; // data channel APIs virtual bool bind_data_service_request(std::string const& request_name, std::string const& group_id, diff --git a/src/lib/grpc_server.cpp b/src/lib/grpc_server.cpp index 139e262..465f950 100644 --- a/src/lib/grpc_server.cpp +++ b/src/lib/grpc_server.cpp @@ -43,8 +43,8 @@ static shared< nuraft::req_msg > toRequest(RaftMessage const& raft_msg) { for (auto const& log : req.log_entries()) { auto log_buffer = nuraft::buffer::alloc(log.buffer().size()); memcpy(log_buffer->data(), log.buffer().data(), log.buffer().size()); - log_entries.push_back( - std::make_shared< nuraft::log_entry >(log.term(), log_buffer, (nuraft::log_val_type)log.type(), log.timestamp())); + log_entries.push_back(std::make_shared< nuraft::log_entry >(log.term(), log_buffer, + (nuraft::log_val_type)log.type(), log.timestamp())); } return message; } @@ -60,6 +60,7 @@ bool grpc_server::request_leadership() { return _raft_server->request_leadership void grpc_server::get_srv_config_all(std::vector< nuraft::ptr< nuraft::srv_config > >& configs_out) { _raft_server->get_srv_config_all(configs_out); } +int grpc_server::get_leader() { return _raft_server->get_leader(); } nuraft::ptr< nuraft::cmd_result< nuraft::ptr< nuraft::buffer > > > grpc_server::rem_srv(int const member_id) { return _raft_server->remove_srv(member_id); diff --git a/src/lib/messaging.cpp b/src/lib/messaging.cpp index 2064fb2..90e4576 100644 --- a/src/lib/messaging.cpp +++ b/src/lib/messaging.cpp @@ -142,7 +142,7 @@ nuraft::cb_func::ReturnCode service::callback_handler(std::string const& group_i case nuraft::cb_func::JoinedCluster: { auto const my_id = param->myId; auto const leader_id = param->leaderId; - LOGINFO("Joined cluster: {}, [l_id:{},my_id:{}]", group_id, leader_id, my_id); + LOGINFO("Joined cluster: {}, [leader_id:{}, my_id:{}]", group_id, leader_id, my_id); { std::lock_guard< std::mutex > lg(_manager_lock); _is_leader[group_id] = (leader_id == my_id); @@ -161,7 +161,7 @@ nuraft::cb_func::ReturnCode service::callback_handler(std::string const& group_i _config_change.notify_all(); } break; case nuraft::cb_func::BecomeFollower: { - LOGDEBUGMOD(nuraft_mesg, "I'm a follower of: {}!", group_id); + LOGDEBUGMOD(nuraft_mesg, "I'm a follower of: {} with the leader {}!", group_id, param->leaderId); { std::lock_guard< std::mutex > lg(_manager_lock); _is_leader[group_id] = false; @@ -430,6 +430,8 @@ void service::get_srv_config_all(std::string const& group_name, _mesg_service->get_srv_config_all(group_name, configs_out); } +server_info_t service::get_leader(std::string const& group_name) { return _mesg_service->get_leader(group_name); } + bool service::bind_data_service_request(std::string const& request_name, std::string const& group_id, data_service_request_handler_t const& request_handler) { return _mesg_service->bind_data_service_request(request_name, group_id, request_handler); diff --git a/src/lib/service.cpp b/src/lib/service.cpp index df16b2e..2d7e4a0 100644 --- a/src/lib/service.cpp +++ b/src/lib/service.cpp @@ -89,7 +89,9 @@ nuraft::cmd_result_code msg_service::add_srv(group_name_t const& group_name, nur if (server) { try { return server->add_srv(cfg)->get_result_code(); - } catch (std::runtime_error& rte) { LOGERRORMOD(nuraft_mesg, "Caught exception during add_srv(): {}", rte.what()); } + } catch (std::runtime_error& rte) { + LOGERRORMOD(nuraft_mesg, "Caught exception during add_srv(): {}", rte.what()); + } } return nuraft::SERVER_NOT_FOUND; } @@ -103,7 +105,9 @@ nuraft::cmd_result_code msg_service::rm_srv(group_name_t const& group_name, int if (server) { try { return server->rem_srv(member_id)->get_result_code(); - } catch (std::runtime_error& rte) { LOGERRORMOD(nuraft_mesg, "Caught exception during rm_srv(): {}", rte.what()); } + } catch (std::runtime_error& rte) { + LOGERRORMOD(nuraft_mesg, "Caught exception during rm_srv(): {}", rte.what()); + } } return nuraft::SERVER_NOT_FOUND; } @@ -136,8 +140,38 @@ void msg_service::get_srv_config_all(group_name_t const& group_name, try { server->get_srv_config_all(configs_out); return; - } catch (std::runtime_error& rte) { LOGERRORMOD(nuraft_mesg, "Caught exception during add_srv(): {}", rte.what()); } + } catch (std::runtime_error& rte) { + LOGERRORMOD(nuraft_mesg, "Caught exception during add_srv(): {}", rte.what()); + } + } +} + +server_info_t msg_service::get_leader(group_name_t const& group_name) { + + shared< grpc_server > server; + { + std::shared_lock< lock_type > rl(_raft_servers_lock); + if (auto it = _raft_servers.find(group_name); _raft_servers.end() != it) { server = it->second.m_server; } + } + if (server) { + try { + auto leader_id = server->get_leader(); + std::vector< std::shared_ptr< nuraft::srv_config > > configs; + server->get_srv_config_all(configs); + std::string leader_endpoint = ""; + auto it = std::find_if(configs.begin(), configs.end(), + [&leader_id](const auto& cfg) { return cfg->get_id() == leader_id; }); + + if (it != configs.end()) { + return std::make_pair(leader_id, (*it)->get_endpoint()); + } else { + LOGERRORMOD(nuraft_mesg, "Could not find leader in the server config"); + } + } catch (std::runtime_error& rte) { + LOGERRORMOD(nuraft_mesg, "Caught exception during get_leader(): {}", rte.what()); + } } + return std::make_pair(-1, ""); } nuraft::cmd_result_code msg_service::append_entries(group_name_t const& group_name, @@ -150,7 +184,9 @@ nuraft::cmd_result_code msg_service::append_entries(group_name_t const& group_na if (server) { try { return server->append_entries(logs)->get_result_code(); - } catch (std::runtime_error& rte) { LOGERRORMOD(nuraft_mesg, "Caught exception during step(): {}", rte.what()); } + } catch (std::runtime_error& rte) { + LOGERRORMOD(nuraft_mesg, "Caught exception during step(): {}", rte.what()); + } } return nuraft::SERVER_NOT_FOUND; } @@ -212,7 +248,9 @@ bool msg_service::raftStep(const sisl::AsyncRpcDataPtr< Messaging, RaftGroupMsg, try { rpc_data->set_status(server->step(request.msg(), *response.mutable_msg())); return true; - } catch (std::runtime_error& rte) { LOGERRORMOD(nuraft_mesg, "Caught exception during step(): {}", rte.what()); } + } catch (std::runtime_error& rte) { + LOGERRORMOD(nuraft_mesg, "Caught exception during step(): {}", rte.what()); + } } else { LOGDEBUGMOD(nuraft_mesg, "Missing RAFT group: {}", group_name); } @@ -226,8 +264,8 @@ bool msg_service::raftStep(const sisl::AsyncRpcDataPtr< Messaging, RaftGroupMsg, class null_service final : public grpc_server { public: using grpc_server::grpc_server; - void associate(sisl::GrpcServer*) override{}; - void bind(sisl::GrpcServer*) override{}; + void associate(sisl::GrpcServer*) override {}; + void bind(sisl::GrpcServer*) override {}; }; class msg_group_listner : public nuraft::rpc_listener { @@ -274,7 +312,8 @@ std::error_condition msg_service::joinRaftGroup(int32_t const srv_id, group_name std::tie(it, happened) = _raft_servers.emplace(std::make_pair(group_name, group_name)); if (_raft_servers.end() != it && happened) { if (auto err = _get_server_ctx(srv_id, group_name, g_type, ctx, it->second.m_metrics); err) { - LOGERRORMOD(nuraft_mesg, "Error during RAFT server creation on group {}: {}", group_name, err.message()); + LOGERRORMOD(nuraft_mesg, "Error during RAFT server creation on group {}: {}", group_name, + err.message()); return err; } DEBUG_ASSERT(!ctx->rpc_listener_, "RPC listner should not be set!"); diff --git a/src/lib/service.hpp b/src/lib/service.hpp index c98d5ee..8787c01 100644 --- a/src/lib/service.hpp +++ b/src/lib/service.hpp @@ -30,7 +30,6 @@ using group_type_t = std::string; class msg_service; class mesg_factory; class repl_service_ctx_grpc; - using lock_type = folly::SharedMutex; class group_metrics : public sisl::MetricsGroupWrapper { @@ -45,9 +44,9 @@ class group_metrics : public sisl::MetricsGroupWrapper { ~group_metrics() { deregister_me_from_farm(); } }; -using get_server_ctx_cb = std::function< std::error_condition(int32_t srv_id, group_name_t const&, group_type_t const&, - nuraft::context*& ctx_out, - shared< group_metrics > metrics) >; +using get_server_ctx_cb = + std::function< std::error_condition(int32_t srv_id, group_name_t const&, group_type_t const&, + nuraft::context*& ctx_out, shared< group_metrics > metrics) >; // pluggable type for data service using data_service_t = data_service_grpc; @@ -91,7 +90,7 @@ class msg_service : public std::enable_shared_from_this< msg_service > { nuraft::cmd_result_code rm_srv(group_name_t const& group_name, int const member_id); bool request_leadership(group_name_t const& group_name); void get_srv_config_all(group_name_t const& group_name, std::vector< shared< nuraft::srv_config > >& configs_out); - + server_info_t get_leader(group_name_t const& group_name); nuraft::cmd_result_code append_entries(group_name_t const& group_name, std::vector< nuraft::ptr< nuraft::buffer > > const& logs); diff --git a/src/tests/MessagingTest.cpp b/src/tests/MessagingTest.cpp index af0b0e2..e84a9e8 100644 --- a/src/tests/MessagingTest.cpp +++ b/src/tests/MessagingTest.cpp @@ -305,6 +305,8 @@ TEST_F(MessagingFixture, SyncAddMember) { srv_list.clear(); instance_1->get_srv_config_all("test_group", srv_list); EXPECT_EQ(srv_list.size(), 4u); + LOGINFO("Leader of server1: id: {} endpoint {}", instance_1->get_leader("test_group").first, + instance_1->get_leader("test_group").second); } class DataServiceFixture : public MessagingFixtureBase {