From 3080c67781d3606f2a1616a12197d94853cc6bdd Mon Sep 17 00:00:00 2001 From: Brian Szmyd Date: Sat, 11 Nov 2023 10:42:22 -0800 Subject: [PATCH] More cleanup (#48) --- .codecov.yml | 1 + conanfile.py | 2 +- src/include/nuraft_mesg/grpc_factory.hpp | 63 ------------- src/include/nuraft_mesg/mesg_factory.hpp | 45 ++++++++- src/include/nuraft_mesg/mesg_state_mgr.hpp | 6 +- src/include/nuraft_mesg/nuraft_mesg.hpp | 6 +- src/lib/CMakeLists.txt | 13 +-- src/lib/factory.cpp | 5 +- src/lib/manager_impl.cpp | 74 +-------------- src/lib/manager_impl.hpp | 20 +--- src/lib/proto/proto_client.cpp | 3 + src/lib/proto/proto_service.cpp | 104 +++++++++++---------- src/lib/proto/proto_service.hpp | 25 ----- src/lib/proto/utils.hpp | 9 +- src/lib/repl_service_ctx.cpp | 82 ++++++++++++++++ src/lib/repl_service_ctx.hpp | 30 ++++++ src/lib/service.cpp | 21 ++--- src/lib/service.hpp | 11 +-- src/tests/MessagingTest.cpp | 25 +++-- src/tests/test_state_manager.cpp | 34 ++++--- src/tests/test_state_manager.h | 9 +- 21 files changed, 287 insertions(+), 301 deletions(-) delete mode 100644 src/include/nuraft_mesg/grpc_factory.hpp delete mode 100644 src/lib/proto/proto_service.hpp create mode 100644 src/lib/repl_service_ctx.cpp create mode 100644 src/lib/repl_service_ctx.hpp diff --git a/.codecov.yml b/.codecov.yml index 628a522..2d0f851 100644 --- a/.codecov.yml +++ b/.codecov.yml @@ -14,3 +14,4 @@ ignore: - "**/*.pb.h" - "**/tests/*" - "**/jungle_logstore/*" + - "**/flatb/*" diff --git a/conanfile.py b/conanfile.py index 14b0b4a..4594576 100644 --- a/conanfile.py +++ b/conanfile.py @@ -8,7 +8,7 @@ class NuRaftMesgConan(ConanFile): name = "nuraft_mesg" - version = "2.0.3" + version = "2.0.4" homepage = "https://github.com/eBay/nuraft_mesg" description = "A gRPC service for NuRAFT" diff --git a/src/include/nuraft_mesg/grpc_factory.hpp b/src/include/nuraft_mesg/grpc_factory.hpp deleted file mode 100644 index b709072..0000000 --- a/src/include/nuraft_mesg/grpc_factory.hpp +++ /dev/null @@ -1,63 +0,0 @@ -/********************************************************************************* - * Modifications Copyright 2017-2019 eBay Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed - * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * - *********************************************************************************/ - -// Brief: -// Implements cornerstone's rpc_client_factory providing sisl::GrpcAsyncClient -// inherited rpc_client instances sharing a common worker pool. -#pragma once - -#include -#include - -#include -#include - -#include "common.hpp" - -namespace nuraft_mesg { - -using client_factory_lock_type = folly::SharedMutex; - -class grpc_factory : public nuraft::rpc_client_factory, public std::enable_shared_from_this< grpc_factory > { - std::string _worker_name; - -protected: - client_factory_lock_type _client_lock; - std::map< peer_id_t, std::shared_ptr< nuraft::rpc_client > > _clients; - -public: - grpc_factory(int const cli_thread_count, std::string const& name); - ~grpc_factory() override = default; - - std::string const& workerName() const { return _worker_name; } - - nuraft::ptr< nuraft::rpc_client > create_client(const std::string& client) override; - nuraft::ptr< nuraft::rpc_client > create_client(peer_id_t const& client); - - virtual nuraft::cmd_result_code create_client(peer_id_t const& client, nuraft::ptr< nuraft::rpc_client >&) = 0; - - virtual nuraft::cmd_result_code reinit_client(peer_id_t const& client, nuraft::ptr< nuraft::rpc_client >&) = 0; - - // Construct and send an AddServer message to the cluster - NullAsyncResult add_server(uint32_t const srv_id, peer_id_t const& srv_addr, nuraft::srv_config const& dest_cfg); - - // Send a client request to the cluster - NullAsyncResult append_entry(std::shared_ptr< nuraft::buffer > buf, nuraft::srv_config const& dest_cfg); - - // Construct and send a RemoveServer message to the cluster - NullAsyncResult rem_server(uint32_t const srv_id, nuraft::srv_config const& dest_cfg); -}; - -} // namespace nuraft_mesg diff --git a/src/include/nuraft_mesg/mesg_factory.hpp b/src/include/nuraft_mesg/mesg_factory.hpp index fafa723..48cea1b 100644 --- a/src/include/nuraft_mesg/mesg_factory.hpp +++ b/src/include/nuraft_mesg/mesg_factory.hpp @@ -14,24 +14,59 @@ *********************************************************************************/ #pragma once -#include #include #include #include #include - -#include "grpc_factory.hpp" +#include #include #include -#include "nuraft_mesg.hpp" +#include + +#include "common.hpp" namespace sisl { struct io_blob; -} +class GrpcTokenClient; +} // namespace sisl namespace nuraft_mesg { +using client_factory_lock_type = folly::SharedMutex; +// Brief: +// Implements cornerstone's rpc_client_factory providing sisl::GrpcAsyncClient +// inherited rpc_client instances sharing a common worker pool. +class grpc_factory : public nuraft::rpc_client_factory, public std::enable_shared_from_this< grpc_factory > { + std::string _worker_name; + +protected: + client_factory_lock_type _client_lock; + std::map< peer_id_t, std::shared_ptr< nuraft::rpc_client > > _clients; + +public: + grpc_factory(int const cli_thread_count, std::string const& name); + ~grpc_factory() override = default; + + std::string const& workerName() const { return _worker_name; } + + nuraft::ptr< nuraft::rpc_client > create_client(const std::string& client) override; + nuraft::ptr< nuraft::rpc_client > create_client(peer_id_t const& client); + + virtual nuraft::cmd_result_code create_client(peer_id_t const& client, nuraft::ptr< nuraft::rpc_client >&) = 0; + + virtual nuraft::cmd_result_code reinit_client(peer_id_t const& client, nuraft::ptr< nuraft::rpc_client >&) = 0; + + // Construct and send an AddServer message to the cluster + NullAsyncResult add_server(uint32_t const srv_id, peer_id_t const& srv_addr, nuraft::srv_config const& dest_cfg); + + // Send a client request to the cluster + NullAsyncResult append_entry(std::shared_ptr< nuraft::buffer > buf, nuraft::srv_config const& dest_cfg); + + // Construct and send a RemoveServer message to the cluster + NullAsyncResult rem_server(uint32_t const srv_id, nuraft::srv_config const& dest_cfg); +}; + class group_factory : public grpc_factory { std::shared_ptr< sisl::GrpcTokenClient > m_token_client; static std::string m_ssl_cert; diff --git a/src/include/nuraft_mesg/mesg_state_mgr.hpp b/src/include/nuraft_mesg/mesg_state_mgr.hpp index b53ff63..9fa635c 100644 --- a/src/include/nuraft_mesg/mesg_state_mgr.hpp +++ b/src/include/nuraft_mesg/mesg_state_mgr.hpp @@ -2,15 +2,11 @@ #include +#include #include #include "common.hpp" -namespace boost { -template < class T > -class intrusive_ptr; -} // namespace boost - namespace sisl { class GenericRpcData; } diff --git a/src/include/nuraft_mesg/nuraft_mesg.hpp b/src/include/nuraft_mesg/nuraft_mesg.hpp index 7efd255..2d9ddcf 100644 --- a/src/include/nuraft_mesg/nuraft_mesg.hpp +++ b/src/include/nuraft_mesg/nuraft_mesg.hpp @@ -19,11 +19,10 @@ #include #include +#include #include -#include #include "common.hpp" -#include "mesg_state_mgr.hpp" namespace grpc { class ByteBuffer; @@ -31,6 +30,7 @@ class Status; } // namespace grpc namespace sisl { +class GenericRpcData; class GrpcTokenVerifier; class GrpcTokenClient; using generic_unary_callback_t = std::function< void(grpc::ByteBuffer&, ::grpc::Status& status) >; @@ -38,6 +38,8 @@ using generic_unary_callback_t = std::function< void(grpc::ByteBuffer&, ::grpc:: namespace nuraft_mesg { +class mesg_state_mgr; + // called by the server after it receives the request using data_service_request_handler_t = std::function< void(sisl::io_blob const& incoming_buf, boost::intrusive_ptr< sisl::GenericRpcData >& rpc_data) >; diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index aabc158..9342c31 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -7,13 +7,14 @@ add_library(${PROJECT_NAME}) target_sources(${PROJECT_NAME} PRIVATE manager_impl.cpp factory.cpp + repl_service_ctx.cpp service.cpp grpc_server.cpp ) target_link_libraries(${PROJECT_NAME} - sisl::sisl - nuraft::nuraft - ) + sisl::sisl + nuraft::nuraft +) add_library(${PROJECT_NAME}-data-svc OBJECT) target_sources(${PROJECT_NAME}-data-svc PRIVATE @@ -21,6 +22,6 @@ target_sources(${PROJECT_NAME}-data-svc PRIVATE ) settings_gen_cpp($ ${CMAKE_CURRENT_BINARY_DIR}/generated/ ${PROJECT_NAME}-data-svc nuraft_mesg_config.fbs) target_link_libraries(${PROJECT_NAME}-data-svc - sisl::sisl - nuraft::nuraft - ) + sisl::sisl + nuraft::nuraft +) diff --git a/src/lib/factory.cpp b/src/lib/factory.cpp index 502c627..36240c4 100644 --- a/src/lib/factory.cpp +++ b/src/lib/factory.cpp @@ -17,13 +17,10 @@ // grpc_factory static functions that makes for easy client creation. // #include -#include -#include #include -#include #include "client.hpp" -#include "nuraft_mesg/grpc_factory.hpp" +#include "nuraft_mesg/mesg_factory.hpp" namespace nuraft_mesg { diff --git a/src/lib/manager_impl.cpp b/src/lib/manager_impl.cpp index 0a33d89..c980376 100644 --- a/src/lib/manager_impl.cpp +++ b/src/lib/manager_impl.cpp @@ -14,9 +14,12 @@ #include #include -#include "service.hpp" #include "nuraft_mesg/mesg_factory.hpp" +#include "nuraft_mesg/mesg_state_mgr.hpp" #include "nuraft_mesg/nuraft_mesg.hpp" + +#include "repl_service_ctx.hpp" +#include "service.hpp" #include "logger.hpp" constexpr auto leader_change_timeout = std::chrono::milliseconds(3200); @@ -354,75 +357,6 @@ bool ManagerImpl::bind_data_service_request(std::string const& request_name, gro return _mesg_service->bind_data_service_request(request_name, group_id, request_handler); } -// The endpoint field of the raft_server config is the uuid of the server. -const std::string repl_service_ctx_grpc::id_to_str(int32_t const id) const { - auto const& srv_config = _server->get_config()->get_server(id); - return (srv_config) ? srv_config->get_endpoint() : std::string(); -} - -const std::optional< Result< peer_id_t > > repl_service_ctx_grpc::get_peer_id(destination_t const& dest) const { - if (std::holds_alternative< peer_id_t >(dest)) return std::get< peer_id_t >(dest); - if (std::holds_alternative< role_regex >(dest)) { - if (!_server) { - LOGW("server not initialized"); - return folly::makeUnexpected(nuraft::cmd_result_code::SERVER_NOT_FOUND); - } - switch (std::get< role_regex >(dest)) { - case role_regex::LEADER: { - if (is_raft_leader()) return folly::makeUnexpected(nuraft::cmd_result_code::BAD_REQUEST); - auto const leader = _server->get_leader(); - if (leader == -1) return folly::makeUnexpected(nuraft::cmd_result_code::SERVER_NOT_FOUND); - return boost::uuids::string_generator()(id_to_str(leader)); - } break; - case role_regex::ALL: { - return std::nullopt; - } break; - default: { - LOGE("Method not implemented"); - return folly::makeUnexpected(nuraft::cmd_result_code::BAD_REQUEST); - } break; - } - } - DEBUG_ASSERT(false, "Unknown destination type"); - return folly::makeUnexpected(nuraft::cmd_result_code::BAD_REQUEST); -} - -NullAsyncResult repl_service_ctx_grpc::data_service_request_unidirectional(destination_t const& dest, - std::string const& request_name, - io_blob_list_t const& cli_buf) { - return (!m_mesg_factory) - ? folly::makeUnexpected(nuraft::cmd_result_code::SERVER_NOT_FOUND) - : m_mesg_factory->data_service_request_unidirectional(get_peer_id(dest), request_name, cli_buf); -} - -AsyncResult< sisl::io_blob > repl_service_ctx_grpc::data_service_request_bidirectional(destination_t const& dest, - std::string const& request_name, - io_blob_list_t const& cli_buf) { - return (!m_mesg_factory) - ? folly::makeUnexpected(nuraft::cmd_result_code::SERVER_NOT_FOUND) - : m_mesg_factory->data_service_request_bidirectional(get_peer_id(dest), request_name, cli_buf); -} - -repl_service_ctx::repl_service_ctx(nuraft::raft_server* server) : _server(server) {} - -bool repl_service_ctx::is_raft_leader() const { return _server->is_leader(); } - -void repl_service_ctx::get_cluster_config(std::list< replica_config >& cluster_config) const { - auto const& srv_configs = _server->get_config()->get_servers(); - for (auto const& srv_config : srv_configs) { - cluster_config.emplace_back(replica_config{srv_config->get_endpoint(), srv_config->get_aux()}); - } -} - -repl_service_ctx_grpc::repl_service_ctx_grpc(grpc_server* server, std::shared_ptr< mesg_factory > const& cli_factory) : - repl_service_ctx(server ? server->raft_server().get() : nullptr), m_mesg_factory(cli_factory) {} - -void repl_service_ctx_grpc::send_data_service_response(io_blob_list_t const& outgoing_buf, - boost::intrusive_ptr< sisl::GenericRpcData >& rpc_data) { - serialize_to_byte_buffer(rpc_data->response(), outgoing_buf); - rpc_data->send_response(); -} - void mesg_state_mgr::make_repl_ctx(grpc_server* server, std::shared_ptr< mesg_factory > const& cli_factory) { m_repl_svc_ctx = std::make_unique< repl_service_ctx_grpc >(server, cli_factory); } diff --git a/src/lib/manager_impl.hpp b/src/lib/manager_impl.hpp index 4d77507..a576156 100644 --- a/src/lib/manager_impl.hpp +++ b/src/lib/manager_impl.hpp @@ -23,6 +23,7 @@ #include +#include "nuraft_mesg/nuraft_mesg.hpp" #include "nuraft_mesg/mesg_factory.hpp" #include "common_lib.hpp" @@ -94,23 +95,4 @@ class ManagerImpl : public Manager { std::vector< std::shared_ptr< nuraft::srv_config > >& configs_out) override; }; -class repl_service_ctx_grpc : public repl_service_ctx { -public: - repl_service_ctx_grpc(grpc_server* server, std::shared_ptr< mesg_factory > const& cli_factory); - ~repl_service_ctx_grpc() override = default; - std::shared_ptr< mesg_factory > m_mesg_factory; - - NullAsyncResult data_service_request_unidirectional(destination_t const& dest, std::string const& request_name, - io_blob_list_t const& cli_buf) override; - AsyncResult< sisl::io_blob > data_service_request_bidirectional(destination_t const& dest, - std::string const& request_name, - io_blob_list_t const& cli_buf) override; - void send_data_service_response(io_blob_list_t const& outgoing_buf, - boost::intrusive_ptr< sisl::GenericRpcData >& rpc_data) override; - -private: - const std::optional< Result< peer_id_t > > get_peer_id(destination_t const& dest) const; - const std::string id_to_str(int32_t const id) const; -}; - } // namespace nuraft_mesg diff --git a/src/lib/proto/proto_client.cpp b/src/lib/proto/proto_client.cpp index 39f6353..569383d 100644 --- a/src/lib/proto/proto_client.cpp +++ b/src/lib/proto/proto_client.cpp @@ -67,6 +67,9 @@ inline std::shared_ptr< nuraft::resp_msg > toResponse(RaftMessage const& raft_ms std::atomic_uint64_t grpc_base_client::_client_counter = 0ul; +/// +// This is where the magic of serialization happens starting with creating a RaftMessage and invoking our +// specific ::send() which will later transform into a RaftGroupMsg void grpc_base_client::send(std::shared_ptr< nuraft::req_msg >& req, nuraft::rpc_handler& complete, uint64_t) { assert(req && complete); RaftMessage grpc_request; diff --git a/src/lib/proto/proto_service.cpp b/src/lib/proto/proto_service.cpp index 663034c..50ec70a 100644 --- a/src/lib/proto/proto_service.cpp +++ b/src/lib/proto/proto_service.cpp @@ -1,5 +1,3 @@ -#include "proto_service.hpp" - #include #include #include @@ -10,12 +8,51 @@ #include "nuraft_mesg/mesg_factory.hpp" #include "nuraft_mesg/nuraft_mesg.hpp" +#include "lib/service.hpp" + #include "messaging_service.grpc.pb.h" #include "utils.hpp" namespace nuraft_mesg { -using AsyncRaftSvc = Messaging::AsyncService; +static std::shared_ptr< nuraft::req_msg > toRequest(RaftMessage const& raft_msg) { + assert(raft_msg.has_rc_request()); + auto const& base = raft_msg.base(); + auto const& req = raft_msg.rc_request(); + auto message = + std::make_shared< nuraft::req_msg >(base.term(), (nuraft::msg_type)base.type(), base.src(), base.dest(), + req.last_log_term(), req.last_log_index(), req.commit_index()); + auto& log_entries = message->log_entries(); + for (auto const& log : req.log_entries()) { + auto log_buffer = nuraft::buffer::alloc(log.buffer().size()); + memcpy(log_buffer->data(), log.buffer().data(), log.buffer().size()); + log_entries.push_back(std::make_shared< nuraft::log_entry >(log.term(), log_buffer, + (nuraft::log_val_type)log.type(), log.timestamp())); + } + return message; +} + +static RCResponse* fromRCResponse(nuraft::resp_msg& rcmsg) { + auto req = new RCResponse; + req->set_next_index(rcmsg.get_next_idx()); + req->set_accepted(rcmsg.get_accepted()); + req->set_result_code((ResultCode)(0 - rcmsg.get_result_code())); + auto ctx = rcmsg.get_ctx(); + if (ctx) { req->set_context(ctx->data(), ctx->container_size()); } + return req; +} + +class proto_service : public msg_service { + ::grpc::Status step(nuraft::raft_server& server, const RaftMessage& request, RaftMessage& reply); + +public: + using msg_service::msg_service; + void associate(sisl::GrpcServer* server) override; + void bind(sisl::GrpcServer* server) override; + + // Incomming gRPC message + bool raftStep(const sisl::AsyncRpcDataPtr< Messaging, RaftGroupMsg, RaftGroupMsg >& rpc_data); +}; void proto_service::associate(::sisl::GrpcServer* server) { msg_service::associate(server); @@ -28,13 +65,29 @@ void proto_service::associate(::sisl::GrpcServer* server) { void proto_service::bind(::sisl::GrpcServer* server) { msg_service::bind(server); if (!server->register_rpc< Messaging, RaftGroupMsg, RaftGroupMsg, false >( - "RaftStep", &AsyncRaftSvc::RequestRaftStep, + "RaftStep", &Messaging::AsyncService::RequestRaftStep, std::bind(&proto_service::raftStep, this, std::placeholders::_1))) { LOGE("Could not bind gRPC ::RaftStep to routine!"); abort(); } } +::grpc::Status proto_service::step(nuraft::raft_server& server, const RaftMessage& request, RaftMessage& reply) { + LOGT("Stepping [{}] from: [{}] to: [{}]", nuraft::msg_type_to_string(nuraft::msg_type(request.base().type())), + request.base().src(), request.base().dest()); + auto rcreq = toRequest(request); + auto resp = nuraft::raft_server_handler::process_req(&server, *rcreq); + if (!resp) { return ::grpc::Status(::grpc::StatusCode::CANCELLED, "Server rejected request"); } + assert(resp); + reply.set_allocated_base(fromBaseRequest(*resp)); + reply.set_allocated_rc_response(fromRCResponse(*resp)); + if (!resp->get_accepted()) { + auto const srv_conf = server.get_srv_config(reply.base().dest()); + if (srv_conf) { reply.mutable_rc_response()->set_dest_addr(srv_conf->get_endpoint()); } + } + return ::grpc::Status(); +} + bool proto_service::raftStep(const sisl::AsyncRpcDataPtr< Messaging, RaftGroupMsg, RaftGroupMsg >& rpc_data) { auto& request = rpc_data->request(); auto& response = rpc_data->response(); @@ -98,49 +151,6 @@ bool proto_service::raftStep(const sisl::AsyncRpcDataPtr< Messaging, RaftGroupMs return true; } -static RCResponse* fromRCResponse(nuraft::resp_msg& rcmsg) { - auto req = new RCResponse; - req->set_next_index(rcmsg.get_next_idx()); - req->set_accepted(rcmsg.get_accepted()); - req->set_result_code((ResultCode)(0 - rcmsg.get_result_code())); - auto ctx = rcmsg.get_ctx(); - if (ctx) { req->set_context(ctx->data(), ctx->container_size()); } - return req; -} - -static std::shared_ptr< nuraft::req_msg > toRequest(RaftMessage const& raft_msg) { - assert(raft_msg.has_rc_request()); - auto const& base = raft_msg.base(); - auto const& req = raft_msg.rc_request(); - auto message = - std::make_shared< nuraft::req_msg >(base.term(), (nuraft::msg_type)base.type(), base.src(), base.dest(), - req.last_log_term(), req.last_log_index(), req.commit_index()); - auto& log_entries = message->log_entries(); - for (auto const& log : req.log_entries()) { - auto log_buffer = nuraft::buffer::alloc(log.buffer().size()); - memcpy(log_buffer->data(), log.buffer().data(), log.buffer().size()); - log_entries.push_back(std::make_shared< nuraft::log_entry >(log.term(), log_buffer, - (nuraft::log_val_type)log.type(), log.timestamp())); - } - return message; -} - -::grpc::Status proto_service::step(nuraft::raft_server& server, const RaftMessage& request, RaftMessage& reply) { - LOGT("Stepping [{}] from: [{}] to: [{}]", nuraft::msg_type_to_string(nuraft::msg_type(request.base().type())), - request.base().src(), request.base().dest()); - auto rcreq = toRequest(request); - auto resp = nuraft::raft_server_handler::process_req(&server, *rcreq); - if (!resp) { return ::grpc::Status(::grpc::StatusCode::CANCELLED, "Server rejected request"); } - assert(resp); - reply.set_allocated_base(fromBaseRequest(*resp)); - reply.set_allocated_rc_response(fromRCResponse(*resp)); - if (!resp->get_accepted()) { - auto const srv_conf = server.get_srv_config(reply.base().dest()); - if (srv_conf) { reply.mutable_rc_response()->set_dest_addr(srv_conf->get_endpoint()); } - } - return ::grpc::Status(); -} - std::shared_ptr< msg_service > msg_service::create(get_server_ctx_cb get_server_ctx, group_id_t const& service_address, bool const enable_data_service) { return std::make_shared< proto_service >(get_server_ctx, service_address, enable_data_service); diff --git a/src/lib/proto/proto_service.hpp b/src/lib/proto/proto_service.hpp deleted file mode 100644 index 0766455..0000000 --- a/src/lib/proto/proto_service.hpp +++ /dev/null @@ -1,25 +0,0 @@ -/// -// Copyright 2018 (c) eBay Corporation -// - -#pragma once - -#include "lib/service.hpp" - -namespace nuraft_mesg { - -class RaftGroupMsg; -class RaftMessage; -class Messaging; - -class proto_service : public msg_service { -public: - using msg_service::msg_service; - void associate(sisl::GrpcServer* server); - void bind(sisl::GrpcServer* server); - - bool raftStep(const sisl::AsyncRpcDataPtr< Messaging, RaftGroupMsg, RaftGroupMsg >& rpc_data); - ::grpc::Status step(nuraft::raft_server& server, const RaftMessage& request, RaftMessage& reply); -}; - -} // namespace nuraft_mesg diff --git a/src/lib/proto/utils.hpp b/src/lib/proto/utils.hpp index ba13c7a..6f32e18 100644 --- a/src/lib/proto/utils.hpp +++ b/src/lib/proto/utils.hpp @@ -13,16 +13,11 @@ * *********************************************************************************/ -// Brief: -// Common transformations -// #pragma once -#include "nuraft_mesg/mesg_factory.hpp" +#include #include "raft_types.pb.h" -#include "lib/common_lib.hpp" - namespace nuraft_mesg { inline RCMsgBase* fromBaseRequest(nuraft::msg_base const& rcbase) { @@ -34,6 +29,4 @@ inline RCMsgBase* fromBaseRequest(nuraft::msg_base const& rcbase) { return base; } -::grpc::Status step(nuraft::raft_server& raft_server, const RaftMessage& request, RaftMessage& reply); - } // namespace nuraft_mesg diff --git a/src/lib/repl_service_ctx.cpp b/src/lib/repl_service_ctx.cpp new file mode 100644 index 0000000..6aea648 --- /dev/null +++ b/src/lib/repl_service_ctx.cpp @@ -0,0 +1,82 @@ +#include "repl_service_ctx.hpp" + +#include +#include +#include + +#include "nuraft_mesg/mesg_factory.hpp" +#include "grpc_server.hpp" +#include "common_lib.hpp" + +namespace nuraft_mesg { + +// The endpoint field of the raft_server config is the uuid of the server. +const std::string repl_service_ctx_grpc::id_to_str(int32_t const id) const { + auto const& srv_config = _server->get_config()->get_server(id); + return (srv_config) ? srv_config->get_endpoint() : std::string(); +} + +const std::optional< Result< peer_id_t > > repl_service_ctx_grpc::get_peer_id(destination_t const& dest) const { + if (std::holds_alternative< peer_id_t >(dest)) return std::get< peer_id_t >(dest); + if (std::holds_alternative< role_regex >(dest)) { + if (!_server) { + LOGW("server not initialized"); + return folly::makeUnexpected(nuraft::cmd_result_code::SERVER_NOT_FOUND); + } + switch (std::get< role_regex >(dest)) { + case role_regex::LEADER: { + if (is_raft_leader()) return folly::makeUnexpected(nuraft::cmd_result_code::BAD_REQUEST); + auto const leader = _server->get_leader(); + if (leader == -1) return folly::makeUnexpected(nuraft::cmd_result_code::SERVER_NOT_FOUND); + return boost::uuids::string_generator()(id_to_str(leader)); + } break; + case role_regex::ALL: { + return std::nullopt; + } break; + default: { + LOGE("Method not implemented"); + return folly::makeUnexpected(nuraft::cmd_result_code::BAD_REQUEST); + } break; + } + } + DEBUG_ASSERT(false, "Unknown destination type"); + return folly::makeUnexpected(nuraft::cmd_result_code::BAD_REQUEST); +} + +NullAsyncResult repl_service_ctx_grpc::data_service_request_unidirectional(destination_t const& dest, + std::string const& request_name, + io_blob_list_t const& cli_buf) { + return (!m_mesg_factory) + ? folly::makeUnexpected(nuraft::cmd_result_code::SERVER_NOT_FOUND) + : m_mesg_factory->data_service_request_unidirectional(get_peer_id(dest), request_name, cli_buf); +} + +AsyncResult< sisl::io_blob > repl_service_ctx_grpc::data_service_request_bidirectional(destination_t const& dest, + std::string const& request_name, + io_blob_list_t const& cli_buf) { + return (!m_mesg_factory) + ? folly::makeUnexpected(nuraft::cmd_result_code::SERVER_NOT_FOUND) + : m_mesg_factory->data_service_request_bidirectional(get_peer_id(dest), request_name, cli_buf); +} + +repl_service_ctx::repl_service_ctx(nuraft::raft_server* server) : _server(server) {} + +bool repl_service_ctx::is_raft_leader() const { return _server->is_leader(); } + +void repl_service_ctx::get_cluster_config(std::list< replica_config >& cluster_config) const { + auto const& srv_configs = _server->get_config()->get_servers(); + for (auto const& srv_config : srv_configs) { + cluster_config.emplace_back(replica_config{srv_config->get_endpoint(), srv_config->get_aux()}); + } +} + +repl_service_ctx_grpc::repl_service_ctx_grpc(grpc_server* server, std::shared_ptr< mesg_factory > const& cli_factory) : + repl_service_ctx(server ? server->raft_server().get() : nullptr), m_mesg_factory(cli_factory) {} + +void repl_service_ctx_grpc::send_data_service_response(io_blob_list_t const& outgoing_buf, + boost::intrusive_ptr< sisl::GenericRpcData >& rpc_data) { + serialize_to_byte_buffer(rpc_data->response(), outgoing_buf); + rpc_data->send_response(); +} + +} // namespace nuraft_mesg diff --git a/src/lib/repl_service_ctx.hpp b/src/lib/repl_service_ctx.hpp new file mode 100644 index 0000000..e1dd7c0 --- /dev/null +++ b/src/lib/repl_service_ctx.hpp @@ -0,0 +1,30 @@ +#pragma once + +#include +#include + +#include "nuraft_mesg/mesg_state_mgr.hpp" +#include "nuraft_mesg/common.hpp" + +namespace nuraft_mesg { + +class repl_service_ctx_grpc : public repl_service_ctx { +public: + repl_service_ctx_grpc(grpc_server* server, std::shared_ptr< mesg_factory > const& cli_factory); + ~repl_service_ctx_grpc() override = default; + std::shared_ptr< mesg_factory > m_mesg_factory; + + NullAsyncResult data_service_request_unidirectional(destination_t const& dest, std::string const& request_name, + io_blob_list_t const& cli_buf) override; + AsyncResult< sisl::io_blob > data_service_request_bidirectional(destination_t const& dest, + std::string const& request_name, + io_blob_list_t const& cli_buf) override; + void send_data_service_response(io_blob_list_t const& outgoing_buf, + boost::intrusive_ptr< sisl::GenericRpcData >& rpc_data) override; + +private: + const std::optional< Result< peer_id_t > > get_peer_id(destination_t const& dest) const; + const std::string id_to_str(int32_t const id) const; +}; + +} // namespace nuraft_mesg diff --git a/src/lib/service.cpp b/src/lib/service.cpp index 1ae466c..f85c47f 100644 --- a/src/lib/service.cpp +++ b/src/lib/service.cpp @@ -1,13 +1,12 @@ -#include -#include +#include "service.hpp" + #include #include #include #include -#include "lib/service.hpp" - #include "nuraft_mesg/mesg_factory.hpp" +#include "nuraft_mesg/mesg_state_mgr.hpp" #include "nuraft_mesg/nuraft_mesg.hpp" SISL_OPTION_GROUP(nuraft_mesg, @@ -139,12 +138,14 @@ void msg_service::setDefaultGroupType(std::string const& _type) { } class msg_group_listner : public nuraft::rpc_listener { - std::shared_ptr< msg_service > _svc; + std::weak_ptr< msg_service > _svc; group_id_t _group; public: - msg_group_listner(std::shared_ptr< msg_service > svc, group_id_t const& group) : _svc(svc), _group(group) {} - ~msg_group_listner() { _svc->shutdown_for(_group); } + msg_group_listner(std::shared_ptr< msg_service > const& svc, group_id_t const& group) : _svc(svc), _group(group) {} + ~msg_group_listner() { + if (auto svc = _svc.lock(); svc) svc->shutdown_for(_group); + } void listen(nuraft::ptr< nuraft::msg_handler >&) override { LOGI("[group={}]", _group); } void stop() override { LOGI("[group={}]", _group); } @@ -162,7 +163,6 @@ void msg_service::shutdown_for(group_id_t const& group_id) { return; } } - _raft_servers_sync.notify_all(); } nuraft::cmd_result_code msg_service::joinRaftGroup(int32_t const srv_id, group_id_t const& group_id, @@ -233,11 +233,6 @@ void msg_service::shutdown() { server->raft_server()->stop_server(); server->raft_server()->shutdown(); } - - { - std::unique_lock< lock_type > lck(_raft_servers_lock); - _raft_servers_sync.wait(lck, [this]() { return _raft_servers.empty(); }); - } LOGI("MessagingService shutdown complete."); } diff --git a/src/lib/service.hpp b/src/lib/service.hpp index 5612d66..a948038 100644 --- a/src/lib/service.hpp +++ b/src/lib/service.hpp @@ -1,18 +1,16 @@ /// // Copyright 2018 (c) eBay Corporation // - #pragma once -#include -#include #include + +#include #include #include #include #include "grpc_server.hpp" - #include "manager_impl.hpp" #include "data_service_grpc.hpp" @@ -52,10 +50,6 @@ struct grpc_server_wrapper { class msg_service : public std::enable_shared_from_this< msg_service >, public nuraft::raft_server_handler { bool _data_service_enabled; data_service_t _data_service; - - std::mutex _raft_sync_lock; - std::condition_variable_any _raft_servers_sync; - std::string _default_group_type; protected: @@ -66,7 +60,6 @@ class msg_service : public std::enable_shared_from_this< msg_service >, public n public: msg_service(get_server_ctx_cb get_server_ctx, peer_id_t const& service_address, bool const enable_data_service); - virtual ~msg_service(); static std::shared_ptr< msg_service > create(get_server_ctx_cb get_server_ctx, peer_id_t const& service_address, bool const enable_data_service); diff --git a/src/tests/MessagingTest.cpp b/src/tests/MessagingTest.cpp index 074ebb4..6ef2b6c 100644 --- a/src/tests/MessagingTest.cpp +++ b/src/tests/MessagingTest.cpp @@ -29,7 +29,7 @@ #include "libnuraft/cluster_config.hxx" #include "libnuraft/state_machine.hxx" -#include "nuraft_mesg/common.hpp" +#include "nuraft_mesg/nuraft_mesg.hpp" #include "nuraft_mesg/mesg_factory.hpp" #include "test_state_manager.h" @@ -176,6 +176,12 @@ class MessagingFixtureBase : public ::testing::Test { app_3_->map_peers(lookup_map); } + void TearDown() override { + app_3_->instance_->leave_group(group_id_); + app_2_->instance_->leave_group(group_id_); + app_1_->instance_->leave_group(group_id_); + } + void start(bool data_svc_enabled = false) { app_1_->start(data_svc_enabled); app_2_->start(data_svc_enabled); @@ -216,10 +222,6 @@ TEST_F(MessagingFixture, ClientRequest) { {"op_type", 2}, }); EXPECT_TRUE(app_1_->instance_->append_entries(group_id_, {buf}).get()); - - app_3_->instance_->leave_group(group_id_); - app_2_->instance_->leave_group(group_id_); - app_1_->instance_->leave_group(group_id_); } // Basic resiliency test (append_entries) @@ -249,10 +251,6 @@ TEST_F(MessagingFixture, MemberCrash) { EXPECT_FALSE(app_3_->instance_->become_leader(boost::uuids::random_generator()()).get()); EXPECT_TRUE(app_3_->instance_->become_leader(group_id_).get()); EXPECT_TRUE(app_3_->instance_->append_entries(group_id_, {buf}).get()); - - app_3_->instance_->leave_group(group_id_); - app_2_->instance_->leave_group(group_id_); - app_1_->instance_->leave_group(group_id_); } // Test sending a message for a group the messaging service is not aware of. @@ -313,6 +311,7 @@ TEST_F(MessagingFixture, SyncAddMember) { srv_list.clear(); app_1_->instance_->get_srv_config_all(group_id_, srv_list); EXPECT_EQ(srv_list.size(), 4u); + app_4->instance_->leave_group(group_id_); } class DataServiceFixture : public MessagingFixtureBase { @@ -324,6 +323,7 @@ class DataServiceFixture : public MessagingFixtureBase { } void TearDown() override { + MessagingFixtureBase::TearDown(); for (auto& buf : cli_buf) { buf.buf_free(); } @@ -425,6 +425,13 @@ TEST_F(DataServiceFixture, BasicTest1) { // test_group: 4 (1 SEND_DATA) + 1 (1 REQUEST_DATA) + 1 (SEND_DATA to a peer) = 6 // data_service_test_group: 1 (1 REQUEST_DATA) + 4 (1 SEND_DATA) = 5 EXPECT_EQ(test_state_mgr::get_server_counter(), 11); + app_5->instance_->leave_group(data_group); + app_5->instance_->leave_group(group_id_); + app_4->instance_->leave_group(data_group); + app_4->instance_->leave_group(group_id_); + app_3_->instance_->leave_group(data_group); + app_2_->instance_->leave_group(data_group); + app_1_->instance_->leave_group(data_group); } TEST_F(DataServiceFixture, BasicTest2) { diff --git a/src/tests/test_state_manager.cpp b/src/tests/test_state_manager.cpp index 4e57376..0cc84ee 100644 --- a/src/tests/test_state_manager.cpp +++ b/src/tests/test_state_manager.cpp @@ -14,19 +14,23 @@ *********************************************************************************/ #include "test_state_manager.h" +#include #include +#include +#include #include -#include "jungle_logstore/jungle_log_store.h" -#include +#include #include #include +#include + +#include "nuraft_mesg/nuraft_mesg.hpp" +#include "jungle_logstore/jungle_log_store.h" -#include "nuraft_mesg/common.hpp" #include "test_state_machine.h" -#include -#include -#include + +#define STATE_PATH(g, s, f) fmt::format(FMT_STRING("{}_s{}{}"), (g), (s), (f)) using json = nlohmann::json; @@ -49,12 +53,12 @@ std::error_condition jsonObjectFromFile(std::string const& filename, json& json_ } std::error_condition loadConfigFile(json& config_map, nuraft_mesg::group_id_t const& _group_id, int32_t const _srv_id) { - auto const config_file = fmt::format(FMT_STRING("{}_s{}/config.json"), _group_id, _srv_id); + auto const config_file = STATE_PATH(_group_id, _srv_id, "/config.json"); return jsonObjectFromFile(config_file, config_map); } std::error_condition loadStateFile(json& state_map, nuraft_mesg::group_id_t const& _group_id, int32_t const _srv_id) { - auto const state_file = fmt::format(FMT_STRING("{}_s{}/state.json"), _group_id, _srv_id); + auto const state_file = STATE_PATH(_group_id, _srv_id, "/state.json"); return jsonObjectFromFile(state_file, state_map); } @@ -115,7 +119,7 @@ nuraft::ptr< nuraft::cluster_config > test_state_mgr::load_config() { } nuraft::ptr< nuraft::log_store > test_state_mgr::load_log_store() { - return nuraft::cs_new< nuraft::jungle_log_store >(fmt::format(FMT_STRING("{}_s{}"), _group_id, _srv_id)); + return nuraft::cs_new< nuraft::jungle_log_store >(STATE_PATH(_group_id, _srv_id, "")); } nuraft::ptr< nuraft::srv_state > test_state_mgr::read_state() { @@ -132,7 +136,7 @@ nuraft::ptr< nuraft::srv_state > test_state_mgr::read_state() { } void test_state_mgr::save_config(const nuraft::cluster_config& config) { - auto const config_file = fmt::format(FMT_STRING("{}_s{}/config.json"), _group_id, _srv_id); + auto const config_file = STATE_PATH(_group_id, _srv_id, "/config.json"); auto json_obj = json{{"log_idx", config.get_log_idx()}, {"prev_log_idx", config.get_prev_log_idx()}, {"eventual_consistency", config.is_async_replication()}, @@ -145,7 +149,7 @@ void test_state_mgr::save_config(const nuraft::cluster_config& config) { } void test_state_mgr::save_state(const nuraft::srv_state& state) { - auto const state_file = fmt::format(FMT_STRING("{}_s{}/state.json"), _group_id, _srv_id); + auto const state_file = STATE_PATH(_group_id, _srv_id, "/state.json"); auto json_obj = json{{"term", state.get_term()}, {"voted_for", state.get_voted_for()}}; try { @@ -160,7 +164,13 @@ std::shared_ptr< nuraft::state_machine > test_state_mgr::get_state_machine() { return std::static_pointer_cast< nuraft::state_machine >(_state_machine); } -void test_state_mgr::permanent_destroy() {} +test_state_mgr::~test_state_mgr() { + if (auto path = std::filesystem::weakly_canonical(STATE_PATH(_group_id, _srv_id, "")); + _will_destroy && std::filesystem::exists(path)) + std::filesystem::remove_all(path); +} + +void test_state_mgr::permanent_destroy() { _will_destroy = true; } void test_state_mgr::leave() {} diff --git a/src/tests/test_state_manager.h b/src/tests/test_state_manager.h index 86b07cb..449fe46 100644 --- a/src/tests/test_state_manager.h +++ b/src/tests/test_state_manager.h @@ -14,19 +14,22 @@ *********************************************************************************/ #pragma once -#include "nuraft_mesg/nuraft_mesg.hpp" +#include "nuraft_mesg/mesg_state_mgr.hpp" #include class test_state_machine; namespace nuraft_mesg { +class Manager; class service; -} +} // namespace nuraft_mesg class test_state_mgr : public nuraft_mesg::mesg_state_mgr { + bool _will_destroy{false}; + public: test_state_mgr(int32_t srv_id, nuraft_mesg::peer_id_t const& srv_addr, nuraft_mesg::group_id_t const& group_id); - ~test_state_mgr() override = default; + ~test_state_mgr() override; nuraft::ptr< nuraft::cluster_config > load_config() override; void save_config(const nuraft::cluster_config& config) override;