diff --git a/CMakeLists.txt b/CMakeLists.txt index 050f031..7114ad3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ cmake_minimum_required(VERSION 3.10) project(nuraft_mesg) enable_testing() -set(CMAKE_CXX_STANDARD 17) +set(CMAKE_CXX_STANDARD 20) if(EXISTS ${CMAKE_BINARY_DIR}/conanbuildinfo.cmake) include(${CMAKE_BINARY_DIR}/conanbuildinfo.cmake) @@ -11,10 +11,8 @@ else() message(WARNING "Conan Build file does not exist, trying to build without!") endif() -if (${CMAKE_BUILD_TYPE} STREQUAL Debug) - if (NOT ${CONAN_SETTINGS_COMPILER} STREQUAL "clang" AND NOT ${CONAN_SETTINGS_COMPILER} STREQUAL "apple-clang") +if (${CMAKE_BUILD_TYPE} STREQUAL Debug AND ${CMAKE_CXX_COMPILER_ID} STREQUAL GNU) include (cmake/debug_flags.cmake) - endif() endif() if (${MEMORY_SANITIZER_ON}) include (cmake/mem_sanitizer.cmake) diff --git a/src/include/nuraft_mesg/common.hpp b/src/include/nuraft_mesg/common.hpp new file mode 100644 index 0000000..73ff360 --- /dev/null +++ b/src/include/nuraft_mesg/common.hpp @@ -0,0 +1,59 @@ +#pragma once + +#include +#include + +#include +#include +#include +#include + +#include +#include +#include + +SISL_LOGGING_DECL(nuraft) +SISL_LOGGING_DECL(nuraft_mesg) + +namespace nuraft_mesg { + +using peer_id_t = boost::uuids::uuid; +using group_id_t = boost::uuids::uuid; +using group_type_t = std::string; + +using io_blob_list_t = folly::small_vector< sisl::io_blob, 4 >; + +template < typename T > +using Result = folly::Expected< T, nuraft::cmd_result_code >; +template < typename T > +using AsyncResult = folly::SemiFuture< Result< T > >; + +} // namespace nuraft_mesg + +namespace fmt { +template <> +struct formatter< nuraft_mesg::group_id_t > { + template < typename ParseContext > + constexpr auto parse(ParseContext& ctx) { + return ctx.begin(); + } + + template < typename FormatContext > + auto format(nuraft_mesg::group_id_t const& n, FormatContext& ctx) { + return format_to(ctx.out(), "{}", to_string(n)); + } +}; + +template <> +struct formatter< nuraft::cmd_result_code > { + template < typename ParseContext > + constexpr auto parse(ParseContext& ctx) { + return ctx.begin(); + } + + template < typename FormatContext > + auto format(nuraft::cmd_result_code const& c, FormatContext& ctx) { + return format_to(ctx.out(), "{}", int32_t(c)); + } +}; +} // namespace fmt diff --git a/src/include/grpc_factory.hpp b/src/include/nuraft_mesg/grpc_factory.hpp similarity index 87% rename from src/include/grpc_factory.hpp rename to src/include/nuraft_mesg/grpc_factory.hpp index 256dc16..3ef6d88 100644 --- a/src/include/grpc_factory.hpp +++ b/src/include/nuraft_mesg/grpc_factory.hpp @@ -24,6 +24,8 @@ #include #include +#include "common.hpp" + namespace nuraft_mesg { using client_factory_lock_type = folly::SharedMutex; @@ -33,7 +35,7 @@ class grpc_factory : public nuraft::rpc_client_factory, public std::enable_share protected: client_factory_lock_type _client_lock; - std::map< std::string, std::shared_ptr< nuraft::rpc_client > > _clients; + std::map< peer_id_t, std::shared_ptr< nuraft::rpc_client > > _clients; public: grpc_factory(int const cli_thread_count, std::string const& name); @@ -43,9 +45,9 @@ class grpc_factory : public nuraft::rpc_client_factory, public std::enable_share nuraft::ptr< nuraft::rpc_client > create_client(const std::string& client) override; - virtual std::error_condition create_client(const std::string& client, nuraft::ptr< nuraft::rpc_client >&) = 0; + virtual std::error_condition create_client(peer_id_t const& client, nuraft::ptr< nuraft::rpc_client >&) = 0; - virtual std::error_condition reinit_client(const std::string& client, nuraft::ptr< nuraft::rpc_client >&) = 0; + virtual std::error_condition reinit_client(peer_id_t const& client, nuraft::ptr< nuraft::rpc_client >&) = 0; // Construct and send an AddServer message to the cluster std::future< nuraft::cmd_result_code > add_server(uint32_t const srv_id, std::string const& srv_addr, diff --git a/src/lib/grpc_server.hpp b/src/include/nuraft_mesg/grpc_server.hpp similarity index 66% rename from src/lib/grpc_server.hpp rename to src/include/nuraft_mesg/grpc_server.hpp index eb32170..5b53e69 100644 --- a/src/lib/grpc_server.hpp +++ b/src/include/nuraft_mesg/grpc_server.hpp @@ -1,21 +1,3 @@ -/********************************************************************************* - * Modifications Copyright 2017-2019 eBay Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed - * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * - *********************************************************************************/ - -// Brief: -// Translates and forwards the gRPC Step() to cornerstone's raft_server::step() -// #pragma once #include @@ -29,6 +11,9 @@ namespace nuraft_mesg { class RaftMessage; +// Brief: +// Translates and forwards the gRPC Step() to cornerstone's raft_server::step() +// class grpc_server : public nuraft::raft_server_handler { std::shared_ptr< nuraft::raft_server > _raft_server; diff --git a/src/include/mesg_factory.hpp b/src/include/nuraft_mesg/mesg_factory.hpp similarity index 74% rename from src/include/mesg_factory.hpp rename to src/include/nuraft_mesg/mesg_factory.hpp index 4d01562..0b358fc 100644 --- a/src/include/mesg_factory.hpp +++ b/src/include/nuraft_mesg/mesg_factory.hpp @@ -19,10 +19,12 @@ #include #include +#include + #include "grpc_factory.hpp" #include #include -#include "mesg_service.hpp" +#include "nuraft_mesg.hpp" namespace sisl { struct io_blob; @@ -35,37 +37,37 @@ class group_factory : public grpc_factory { static std::string m_ssl_cert; public: - group_factory(int const cli_thread_count, std::string const& name, + group_factory(int const cli_thread_count, boost::uuids::uuid const& name, std::shared_ptr< sisl::GrpcTokenClient > const token_client, std::string const& ssl_cert = ""); using grpc_factory::create_client; - std::error_condition create_client(const std::string& client, nuraft::ptr< nuraft::rpc_client >&) override; - std::error_condition reinit_client(std::string const& client, + std::error_condition create_client(peer_id_t const& client, nuraft::ptr< nuraft::rpc_client >&) override; + std::error_condition reinit_client(peer_id_t const& client, std::shared_ptr< nuraft::rpc_client >& raft_client) override; - virtual std::string lookupEndpoint(std::string const& client) = 0; + virtual std::string lookupEndpoint(peer_id_t const& client) = 0; }; class mesg_factory final : public grpc_factory { std::shared_ptr< group_factory > _group_factory; - group_name_t const _group_name; + group_id_t const _group_name; group_type_t const _group_type; std::shared_ptr< sisl::MetricsGroupWrapper > _metrics; public: - mesg_factory(std::shared_ptr< group_factory > g_factory, group_name_t const& grp_id, group_type_t const& grp_type, + mesg_factory(std::shared_ptr< group_factory > g_factory, group_id_t const& grp_id, group_type_t const& grp_type, std::shared_ptr< sisl::MetricsGroupWrapper > metrics = nullptr) : - grpc_factory(0, grp_id), + grpc_factory(0, to_string(grp_id)), _group_factory(g_factory), _group_name(grp_id), _group_type(grp_type), _metrics(metrics) {} - group_name_t group_name() const { return _group_name; } + group_id_t group_name() const { return _group_name; } - std::error_condition create_client(const std::string& client, nuraft::ptr< nuraft::rpc_client >& rpc_ptr) override; + std::error_condition create_client(peer_id_t const& client, nuraft::ptr< nuraft::rpc_client >& rpc_ptr) override; - std::error_condition reinit_client(const std::string& client, + std::error_condition reinit_client(peer_id_t const& client, std::shared_ptr< nuraft::rpc_client >& raft_client) override; AsyncResult< sisl::io_blob > data_service_request(std::string const& request_name, io_blob_list_t const& cli_buf); diff --git a/src/include/mesg_state_mgr.hpp b/src/include/nuraft_mesg/mesg_state_mgr.hpp similarity index 80% rename from src/include/mesg_state_mgr.hpp rename to src/include/nuraft_mesg/mesg_state_mgr.hpp index aebc249..5afad0b 100644 --- a/src/include/mesg_state_mgr.hpp +++ b/src/include/nuraft_mesg/mesg_state_mgr.hpp @@ -2,13 +2,9 @@ #include -#include -#include -#include -#include - #include -#include + +#include "common.hpp" namespace boost { template < class T > @@ -24,13 +20,6 @@ namespace nuraft_mesg { class mesg_factory; class grpc_server; -using io_blob_list_t = folly::small_vector< sisl::io_blob, 4 >; - -template < typename T > -using Result = folly::Expected< T, std::error_condition >; -template < typename T > -using AsyncResult = folly::SemiFuture< Result< T > >; - class repl_service_ctx { public: repl_service_ctx(grpc_server* server); diff --git a/src/include/mesg_service.hpp b/src/include/nuraft_mesg/nuraft_mesg.hpp similarity index 70% rename from src/include/mesg_service.hpp rename to src/include/nuraft_mesg/nuraft_mesg.hpp index 795bdb8..d465dce 100644 --- a/src/include/mesg_service.hpp +++ b/src/include/nuraft_mesg/nuraft_mesg.hpp @@ -23,10 +23,9 @@ #include #include +#include "common.hpp" #include "mesg_state_mgr.hpp" -SISL_LOGGING_DECL(nuraft) - namespace grpc { class ByteBuffer; class Status; @@ -40,9 +39,6 @@ using generic_unary_callback_t = std::function< void(grpc::ByteBuffer&, ::grpc:: namespace nuraft_mesg { -using group_name_t = std::string; -using group_type_t = std::string; - // called by the server after it receives the request using data_service_request_handler_t = std::function< void(sisl::io_blob const& incoming_buf, boost::intrusive_ptr< sisl::GenericRpcData >& rpc_data) >; @@ -53,14 +49,14 @@ using NullAsyncResult = AsyncResult< folly::Unit >; class MessagingApplication { public: virtual ~MessagingApplication() = default; - virtual std::string lookup_peer(std::string const&) = 0; - virtual std::shared_ptr< mesg_state_mgr > create_state_mgr(int32_t const srv_id, group_name_t const& group_id) = 0; + virtual std::string lookup_peer(peer_id_t const&) = 0; + virtual std::shared_ptr< mesg_state_mgr > create_state_mgr(int32_t const srv_id, group_id_t const& group_id) = 0; }; class Manager { public: struct Params { - std::string server_uuid_; + boost::uuids::uuid server_uuid_; uint32_t mesg_port_; group_type_t default_group_type_; std::string ssl_key_; @@ -74,32 +70,33 @@ class Manager { // Register a new group type virtual void register_mgr_type(group_type_t const& group_type, group_params const&) = 0; - virtual std::shared_ptr< mesg_state_mgr > lookup_state_manager(group_name_t const& group_id) const = 0; - virtual NullAsyncResult create_group(group_name_t const& group_id, group_type_t const& group_type) = 0; - virtual NullResult join_group(group_name_t const& group_id, group_type_t const& group_type, + virtual std::shared_ptr< mesg_state_mgr > lookup_state_manager(group_id_t const& group_id) const = 0; + virtual NullAsyncResult create_group(group_id_t const& group_id, group_type_t const& group_type) = 0; + virtual NullResult join_group(group_id_t const& group_id, group_type_t const& group_type, std::shared_ptr< mesg_state_mgr >) = 0; // Send a client request to the cluster - virtual NullAsyncResult add_member(group_name_t const& group_id, std::string const& server_id) = 0; - virtual NullAsyncResult rem_member(group_name_t const& group_id, std::string const& server_id) = 0; - virtual NullAsyncResult become_leader(group_name_t const& group_id) = 0; - virtual NullAsyncResult client_request(group_name_t const& group_id, std::shared_ptr< nuraft::buffer >&) = 0; + virtual NullAsyncResult add_member(group_id_t const& group_id, peer_id_t const& server_id) = 0; + virtual NullAsyncResult rem_member(group_id_t const& group_id, peer_id_t const& server_id) = 0; + virtual NullAsyncResult become_leader(group_id_t const& group_id) = 0; + virtual NullAsyncResult append_entries(group_id_t const& group_id, + std::vector< std::shared_ptr< nuraft::buffer > > const&) = 0; // Misc Mgmt - virtual void get_srv_config_all(group_name_t const& group_id, + virtual void get_srv_config_all(group_id_t const& group_id, std::vector< std::shared_ptr< nuraft::srv_config > >& configs_out) = 0; - virtual void leave_group(group_name_t const& group_id) = 0; - virtual void append_peers(group_name_t const& group_id, std::list< std::string >&) const = 0; - virtual uint32_t logstore_id(group_name_t const& group_id) const = 0; + virtual void leave_group(group_id_t const& group_id) = 0; + virtual void append_peers(group_id_t const& group_id, std::list< peer_id_t >&) const = 0; + virtual uint32_t logstore_id(group_id_t const& group_id) const = 0; virtual int32_t server_id() const = 0; virtual void restart_server() = 0; // data channel APIs - virtual bool bind_data_service_request(std::string const& request_name, group_name_t const& group_id, + virtual bool bind_data_service_request(std::string const& request_name, group_id_t const& group_id, data_service_request_handler_t const&) = 0; }; -extern int32_t to_server_id(std::string const& server_addr); +extern int32_t to_server_id(peer_id_t const& server_addr); extern std::shared_ptr< Manager > init_messaging(Manager::Params const&, std::weak_ptr< MessagingApplication >, bool with_data_svc = false); diff --git a/src/lib/data_service.hpp b/src/lib/data_service.hpp deleted file mode 100644 index 7315c7e..0000000 --- a/src/lib/data_service.hpp +++ /dev/null @@ -1,32 +0,0 @@ -#pragma once - -#include - -namespace sisl { -struct io_blob; -class GenericRpcData; -} // namespace sisl -namespace nuraft_mesg { - -using data_service_request_handler_t = - std::function< void(sisl::io_blob const& incoming_buf, boost::intrusive_ptr< sisl::GenericRpcData >& rpc_data) >; -using data_service_comp_handler_t = std::function< void(boost::intrusive_ptr< sisl::GenericRpcData >&) >; - -class data_service { - -public: - data_service() = default; - virtual ~data_service() = default; - - // start the data service channel - virtual void associate() = 0; - - // register a new rpc - virtual bool bind(std::string const& request_name, std::string const& group_id, - data_service_request_handler_t const& request_cb) = 0; - - // register all the existing rpcs - virtual void bind() = 0; -}; - -} // namespace nuraft_mesg diff --git a/src/lib/data_service_grpc.cpp b/src/lib/data_service_grpc.cpp index 155fc19..ef39d6c 100644 --- a/src/lib/data_service_grpc.cpp +++ b/src/lib/data_service_grpc.cpp @@ -1,9 +1,9 @@ +#include #include + #include "data_service_grpc.hpp" #include "utils.hpp" -SISL_LOGGING_DECL(nuraft_mesg) - namespace nuraft_mesg { void data_service_grpc::set_grpc_server(sisl::GrpcServer* server) { _grpc_server = server; } @@ -23,7 +23,7 @@ void data_service_grpc::bind() { } } -bool data_service_grpc::bind(std::string const& request_name, std::string const& group_id, +bool data_service_grpc::bind(std::string const& request_name, group_id_t const& group_id, data_service_request_handler_t const& request_cb) { RELEASE_ASSERT(_grpc_server, "NULL _grpc_server!"); if (!request_cb) { diff --git a/src/lib/data_service_grpc.hpp b/src/lib/data_service_grpc.hpp index 20e1dbd..b2abe41 100644 --- a/src/lib/data_service_grpc.hpp +++ b/src/lib/data_service_grpc.hpp @@ -3,7 +3,8 @@ #include #include #include -#include "data_service.hpp" + +#include "nuraft_mesg/nuraft_mesg.hpp" namespace sisl { struct io_blob; @@ -12,7 +13,7 @@ namespace nuraft_mesg { using data_lock_type = folly::SharedMutex; -class data_service_grpc : public data_service { +class data_service_grpc { // key: group_id, value: map // value_key: request name, value_value: handler // Different groups can have same request name. @@ -28,10 +29,10 @@ class data_service_grpc : public data_service { data_service_grpc(data_service_grpc const&) = delete; data_service_grpc& operator=(data_service_grpc const&) = delete; - void associate() override; - void bind() override; - bool bind(std::string const& request_name, std::string const& group_id, - data_service_request_handler_t const& request_cb) override; + void associate(); + void bind(); + bool bind(std::string const& request_name, group_id_t const& group_id, + data_service_request_handler_t const& request_cb); void set_grpc_server(sisl::GrpcServer* server); }; diff --git a/src/lib/grpc_client.hpp b/src/lib/grpc_client.hpp index d246b87..2c996ab 100644 --- a/src/lib/grpc_client.hpp +++ b/src/lib/grpc_client.hpp @@ -23,7 +23,7 @@ #include #include -SISL_LOGGING_DECL(nuraft_mesg) +#include "nuraft_mesg/common.hpp" namespace nuraft_mesg { diff --git a/src/lib/grpc_factory.cpp b/src/lib/grpc_factory.cpp index 64d48c6..7370d5e 100644 --- a/src/lib/grpc_factory.cpp +++ b/src/lib/grpc_factory.cpp @@ -16,10 +16,11 @@ // Brief: // grpc_factory static functions that makes for easy client creation. // +#include #include #include "grpc_client.hpp" -#include "grpc_factory.hpp" +#include "nuraft_mesg/grpc_factory.hpp" #include "proto/raft_types.pb.h" namespace nuraft_mesg { @@ -131,13 +132,14 @@ class grpc_error_client : public grpc_base_client { nuraft::ptr< nuraft::rpc_client > grpc_factory::create_client(const std::string& client) { nuraft::ptr< nuraft::rpc_client > new_client; + auto client_uuid = boost::uuids::string_generator()(client); std::unique_lock< client_factory_lock_type > lk(_client_lock); - auto [it, happened] = _clients.emplace(client, nullptr); + auto [it, happened] = _clients.emplace(client_uuid, nullptr); if (_clients.end() != it) { if (!happened) { LOGDEBUGMOD(nuraft_mesg, "Re-creating client for {}", client); - if (auto err = reinit_client(client, it->second); err) { + if (auto err = reinit_client(client_uuid, it->second); err) { LOGERROR("Failed to re-initialize client {}: {}", client, err.message()); new_client = std::make_shared< grpc_error_client >(); } else { @@ -145,7 +147,7 @@ nuraft::ptr< nuraft::rpc_client > grpc_factory::create_client(const std::string& } } else { LOGDEBUGMOD(nuraft_mesg, "Creating client for {}", client); - if (auto err = create_client(client, it->second); err) { + if (auto err = create_client(client_uuid, it->second); err) { LOGERROR("Failed to create client for {}: {}", client, err.message()); new_client = std::make_shared< grpc_error_client >(); } else { diff --git a/src/lib/grpc_server.cpp b/src/lib/grpc_server.cpp index 86f8ad9..95b697d 100644 --- a/src/lib/grpc_server.cpp +++ b/src/lib/grpc_server.cpp @@ -16,12 +16,10 @@ // Brief: // grpc_server step function and response transformations // -#include "grpc_server.hpp" +#include "nuraft_mesg/grpc_server.hpp" #include "utils.hpp" #include "proto/raft_types.pb.h" -SISL_LOGGING_DECL(nuraft_mesg) - namespace nuraft_mesg { static RCResponse* fromRCResponse(nuraft::resp_msg& rcmsg) { diff --git a/src/lib/logger.hpp b/src/lib/logger.hpp index 56e05d7..d7081db 100644 --- a/src/lib/logger.hpp +++ b/src/lib/logger.hpp @@ -17,15 +17,15 @@ #include #include -SISL_LOGGING_DECL(nuraft) -SISL_LOGGING_DECL(nuraft_mesg) +#include "nuraft_mesg/common.hpp" class nuraft_mesg_logger : public ::nuraft::logger { - std::string const _group_id; + nuraft_mesg::group_id_t const _group_id; std::shared_ptr< sisl::logging::logger_t > _custom_logger; public: - explicit nuraft_mesg_logger(std::string const& group_id, std::shared_ptr< sisl::logging::logger_t > custom_logger) : + explicit nuraft_mesg_logger(nuraft_mesg::group_id_t const& group_id, + std::shared_ptr< sisl::logging::logger_t > custom_logger) : ::nuraft::logger(), _group_id(group_id), _custom_logger(custom_logger) {} void set_level(int l) override { diff --git a/src/lib/manager_impl.cpp b/src/lib/manager_impl.cpp index 3110b2e..14909f1 100644 --- a/src/lib/manager_impl.cpp +++ b/src/lib/manager_impl.cpp @@ -5,7 +5,6 @@ #include #include -#include #include #include #include @@ -17,13 +16,11 @@ #include #include "service.hpp" -#include "mesg_factory.hpp" -#include "mesg_service.hpp" +#include "nuraft_mesg/mesg_factory.hpp" +#include "nuraft_mesg/nuraft_mesg.hpp" #include "logger.hpp" #include "utils.hpp" -SISL_LOGGING_DECL(nuraft_mesg) - constexpr auto cfg_change_timeout = std::chrono::milliseconds(200); constexpr auto leader_change_timeout = std::chrono::milliseconds(3200); constexpr auto grpc_client_threads = 1u; @@ -62,9 +59,9 @@ static std::error_condition convertToError(nuraft::cmd_result_code rc) { } } -int32_t to_server_id(std::string const& server_addr) { +int32_t to_server_id(peer_id_t const& server_addr) { boost::hash< boost::uuids::uuid > uuid_hasher; - return uuid_hasher(boost::uuids::string_generator()(server_addr)) >> 33; + return uuid_hasher(server_addr) >> 33; } class engine_factory : public group_factory { @@ -76,7 +73,7 @@ class engine_factory : public group_factory { start_params.ssl_cert_), application_(app) {} - std::string lookupEndpoint(std::string const& client) override { + std::string lookupEndpoint(peer_id_t const& client) override { LOGTRACEMOD(nuraft_mesg, "[peer={}]", client); if (auto a = application_.lock(); a) return a->lookup_peer(client); return std::string(); @@ -103,7 +100,8 @@ ManagerImpl::ManagerImpl(Manager::Params const& start_params, std::weak_ptr< Mes } catch (spdlog::spdlog_ex const& e) { _custom_logger = spdlog::details::registry::instance().get(logger_name); } sisl::logging::SetLogPattern("[%D %T.%f] [%^%L%$] [%t] %v", _custom_logger); - nuraft::ptr< nuraft::logger > logger = std::make_shared< nuraft_mesg_logger >("scheduler", _custom_logger); + nuraft::ptr< nuraft::logger > logger = + std::make_shared< nuraft_mesg_logger >(start_params_.server_uuid_, _custom_logger); // RAFT request scheduler nuraft::asio_service::options service_options; @@ -114,8 +112,8 @@ ManagerImpl::ManagerImpl(Manager::Params const& start_params, std::weak_ptr< Mes // allowing sharing of the Server and client amongst raft instances. _mesg_service = msg_service::create( - [this](int32_t const srv_id, group_name_t const& group_id, group_type_t const& group_type, - nuraft::context*& ctx, std::shared_ptr< group_metrics > metrics) mutable -> std::error_condition { + [this](int32_t const srv_id, group_id_t const& group_id, group_type_t const& group_type, nuraft::context*& ctx, + std::shared_ptr< group_metrics > metrics) mutable -> nuraft::cmd_result_code { return this->group_init(srv_id, group_id, group_type, ctx, metrics); }, start_params_.server_uuid_, and_data_svc); @@ -140,7 +138,7 @@ void ManagerImpl::restart_server() { _mesg_service->bind(_grpc_server.get()); } -void ManagerImpl::register_mgr_type(std::string const& group_type, group_params const& params) { +void ManagerImpl::register_mgr_type(group_type_t const& group_type, group_params const& params) { std::lock_guard< std::mutex > lg(_manager_lock); auto [it, happened] = _state_mgr_types.emplace(std::make_pair(group_type, params)); DEBUG_ASSERT(_state_mgr_types.end() != it, "Out of memory?"); @@ -148,7 +146,7 @@ void ManagerImpl::register_mgr_type(std::string const& group_type, group_params if (_state_mgr_types.end() == it) { LOGERROR("Could not register group type: {}", group_type); } } -nuraft::cb_func::ReturnCode ManagerImpl::callback_handler(std::string const& group_id, nuraft::cb_func::Type type, +nuraft::cb_func::ReturnCode ManagerImpl::callback_handler(group_id_t const& group_id, nuraft::cb_func::Type type, nuraft::cb_func::Param* param) { switch (type) { case nuraft::cb_func::RemovedFromCluster: { @@ -189,7 +187,7 @@ nuraft::cb_func::ReturnCode ManagerImpl::callback_handler(std::string const& gro return nuraft::cb_func::Ok; } -void ManagerImpl::exit_group(std::string const& group_id) { +void ManagerImpl::exit_group(group_id_t const& group_id) { std::shared_ptr< mesg_state_mgr > mgr; { std::lock_guard< std::mutex > lg(_manager_lock); @@ -198,9 +196,9 @@ void ManagerImpl::exit_group(std::string const& group_id) { if (mgr) mgr->leave(); } -std::error_condition ManagerImpl::group_init(int32_t const srv_id, std::string const& group_id, - std::string const& group_type, nuraft::context*& ctx, - std::shared_ptr< nuraft_mesg::group_metrics > metrics) { +nuraft::cmd_result_code ManagerImpl::group_init(int32_t const srv_id, group_id_t const& group_id, + group_type_t const& group_type, nuraft::context*& ctx, + std::shared_ptr< nuraft_mesg::group_metrics > metrics) { LOGDEBUGMOD(nuraft_mesg, "Creating context for Group: {} as Member: {}", group_id, srv_id); // State manager (RAFT log store, config) @@ -211,7 +209,7 @@ std::error_condition ManagerImpl::group_init(int32_t const srv_id, std::string c std::lock_guard< std::mutex > lg(_manager_lock); auto def_group = _state_mgr_types.end(); if (def_group = _state_mgr_types.find(group_type); _state_mgr_types.end() == def_group) { - return std::make_error_condition(std::errc::invalid_argument); + return nuraft::cmd_result_code::SERVER_NOT_FOUND; } params = def_group->second; @@ -226,7 +224,7 @@ std::error_condition ManagerImpl::group_init(int32_t const srv_id, std::string c sm = it->second->get_state_machine(); smgr = it->second; } else { - return std::make_error_condition(std::errc::not_enough_memory); + return nuraft::cmd_result_code::CANCELLED; } } @@ -243,14 +241,14 @@ std::error_condition ManagerImpl::group_init(int32_t const srv_id, std::string c return this->callback_handler(group_id, type, param); }); - return std::error_condition(); + return nuraft::cmd_result_code::OK; } -NullAsyncResult ManagerImpl::add_member(std::string const& group_id, std::string const& new_id) { - return _mesg_service->add_srv(group_id, nuraft::srv_config(to_server_id(new_id), new_id)) - .deferValue([this, g_id = group_id, n_id = new_id](auto cmd_result) mutable -> NullResult { - auto result = cmd_result.value(); - if (nuraft::OK != result) return folly::makeUnexpected(convertToError(result)); +NullAsyncResult ManagerImpl::add_member(group_id_t const& group_id, peer_id_t const& new_id) { + auto str_id = to_string(new_id); + return _mesg_service->add_srv(group_id, nuraft::srv_config(to_server_id(new_id), str_id)) + .deferValue([this, g_id = group_id, n_id = std::move(str_id)](auto cmd_result) mutable -> NullResult { + if (!cmd_result) return folly::makeUnexpected(cmd_result.error()); // TODO This should not block, but attach a new promise! auto lk = std::unique_lock< std::mutex >(_manager_lock); if (!_config_change.wait_for( @@ -261,35 +259,29 @@ NullAsyncResult ManagerImpl::add_member(std::string const& group_id, std::string [n_id = std::move(n_id)](const std::shared_ptr< nuraft::srv_config >& cfg) { return n_id == cfg->get_endpoint(); }) != srv_list.end(); - })) - return folly::makeUnexpected(std::make_error_condition(std::errc::connection_aborted)); - ; + })) { + return folly::makeUnexpected(nuraft::cmd_result_code::CANCELLED); + } return folly::Unit(); }); } -NullAsyncResult ManagerImpl::rem_member(std::string const& group_id, std::string const& old_id) { - return _mesg_service->rm_srv(group_id, to_server_id(old_id)) - .deferValue([this, group_id](auto cmd_result) mutable -> NullResult { - auto result = cmd_result.value(); - if (nuraft::OK != result) { - if (nuraft::SERVER_NOT_FOUND == result) { - LOGWARN("Messaging service does not know of group: [{}]", group_id); - } - LOGERROR("Unknown failure to add member: [{}]", static_cast< uint32_t >(result)); - return folly::makeUnexpected(std::make_error_condition(std::errc::connection_aborted)); - } - return folly::Unit(); - }); +NullAsyncResult ManagerImpl::append_entries(group_id_t const& group_id, + std::vector< std::shared_ptr< nuraft::buffer > > const& buf) { + return _mesg_service->append_entries(group_id, buf); +} + +NullAsyncResult ManagerImpl::rem_member(group_id_t const& group_id, peer_id_t const& old_id) { + return _mesg_service->rm_srv(group_id, to_server_id(old_id)); } -std::shared_ptr< mesg_state_mgr > ManagerImpl::lookup_state_manager(std::string const& group_id) const { +std::shared_ptr< mesg_state_mgr > ManagerImpl::lookup_state_manager(group_id_t const& group_id) const { std::lock_guard< std::mutex > lg(_manager_lock); if (auto it = _state_managers.find(group_id); _state_managers.end() != it) return it->second; return nullptr; } -NullAsyncResult ManagerImpl::create_group(std::string const& group_id, std::string const& group_type_name) { +NullAsyncResult ManagerImpl::create_group(group_id_t const& group_id, std::string const& group_type_name) { { std::lock_guard< std::mutex > lg(_manager_lock); _is_leader.insert(std::make_pair(group_id, false)); @@ -304,19 +296,18 @@ NullAsyncResult ManagerImpl::create_group(std::string const& group_id, std::stri auto lk = std::unique_lock< std::mutex >(_manager_lock); if (!_config_change.wait_for(lk, leader_change_timeout, [this, g_id = std::move(g_id)]() { return _is_leader[g_id]; })) { - return folly::makeUnexpected(std::make_error_condition(std::errc::timed_out)); + return folly::makeUnexpected(nuraft::cmd_result_code::CANCELLED); } return folly::Unit(); }); } -NullResult ManagerImpl::join_group(std::string const& group_id, std::string const& group_type, +NullResult ManagerImpl::join_group(group_id_t const& group_id, group_type_t const& group_type, std::shared_ptr< mesg_state_mgr > smgr) { { std::lock_guard< std::mutex > lg(_manager_lock); auto [it, happened] = _state_managers.emplace(group_id, smgr); - if (_state_managers.end() == it) - return folly::makeUnexpected(std::make_error_condition(std::errc::not_enough_memory)); + if (_state_managers.end() == it) return folly::makeUnexpected(nuraft::cmd_result_code::CANCELLED); } if (auto const err = _mesg_service->joinRaftGroup(_srv_id, group_id, group_type); err) { std::lock_guard< std::mutex > lg(_manager_lock); @@ -327,18 +318,20 @@ NullResult ManagerImpl::join_group(std::string const& group_id, std::string cons return folly::Unit(); } -void ManagerImpl::append_peers(std::string const& group_id, std::list< std::string >& servers) const { - std::lock_guard< std::mutex > lg(_manager_lock); - if (auto it = _state_managers.find(group_id); _state_managers.end() != it) { - if (auto config = it->second->load_config(); config) { - for (auto const& server : config->get_servers()) { - servers.push_back(server->get_endpoint()); - } +void ManagerImpl::append_peers(group_id_t const& group_id, std::list< peer_id_t >& servers) const { + auto it = _state_managers.end(); + { + std::lock_guard< std::mutex > lg(_manager_lock); + if (it = _state_managers.find(group_id); _state_managers.end() == it) return; + } + if (auto config = it->second->load_config(); config) { + for (auto const& server : config->get_servers()) { + servers.push_back(boost::uuids::string_generator()(server->get_endpoint())); } } } -NullAsyncResult ManagerImpl::become_leader(std::string const& group_id) { +NullAsyncResult ManagerImpl::become_leader(group_id_t const& group_id) { { auto lk = std::unique_lock< std::mutex >(_manager_lock); if (_is_leader[group_id]) { return folly::Unit(); } @@ -355,16 +348,17 @@ NullAsyncResult ManagerImpl::become_leader(std::string const& group_id) { // Do not sleep on the last try if (max_retries != 1) { std::this_thread::sleep_for(std::chrono::milliseconds(leader_change_timeout)); } } - if (!request_success) return folly::makeUnexpected(std::make_error_condition(std::errc::timed_out)); + if (!request_success) return folly::makeUnexpected(nuraft::cmd_result_code::TIMEOUT); + auto lk = std::unique_lock< std::mutex >(_manager_lock); if (!_config_change.wait_for(lk, leader_change_timeout, [this, g_id = std::move(g_id)]() { return _is_leader[g_id]; })) - return folly::makeUnexpected(std::make_error_condition(std::errc::timed_out)); + return folly::makeUnexpected(nuraft::cmd_result_code::TIMEOUT); return folly::Unit(); }); } -void ManagerImpl::leave_group(std::string const& group_id) { +void ManagerImpl::leave_group(group_id_t const& group_id) { LOGINFO("Leaving group [vol={}]", group_id); { std::lock_guard< std::mutex > lg(_manager_lock); @@ -386,25 +380,18 @@ void ManagerImpl::leave_group(std::string const& group_id) { LOGINFO("Finished leaving: [vol={}]", group_id); } -NullAsyncResult ManagerImpl::client_request(std::string const& group_id, std::shared_ptr< nuraft::buffer >& buf) { - return _mesg_service->append_entries(group_id, {buf}).deferValue([](auto cmd_result) -> NullResult { - auto result = cmd_result.value(); - if (nuraft::OK != result) return folly::makeUnexpected(convertToError(result)); - return folly::Unit(); - }); -} -uint32_t ManagerImpl::logstore_id(std::string const& group_id) const { +uint32_t ManagerImpl::logstore_id(group_id_t const& group_id) const { std::lock_guard< std::mutex > lg(_manager_lock); if (auto it = _state_managers.find(group_id); _state_managers.end() != it) { return it->second->get_logstore_id(); } return UINT32_MAX; } -void ManagerImpl::get_srv_config_all(std::string const& group_name, +void ManagerImpl::get_srv_config_all(group_id_t const& group_name, std::vector< std::shared_ptr< nuraft::srv_config > >& configs_out) { _mesg_service->get_srv_config_all(group_name, configs_out); } -bool ManagerImpl::bind_data_service_request(std::string const& request_name, std::string const& group_id, +bool ManagerImpl::bind_data_service_request(std::string const& request_name, group_id_t const& group_id, data_service_request_handler_t const& request_handler) { return _mesg_service->bind_data_service_request(request_name, group_id, request_handler); } @@ -413,7 +400,7 @@ AsyncResult< sisl::io_blob > repl_service_ctx_grpc::data_service_request(std::st io_blob_list_t const& cli_buf) { return (m_mesg_factory) ? m_mesg_factory->data_service_request(request_name, cli_buf) - : folly::makeUnexpected(std::make_error_condition(std::errc::no_such_device)); + : folly::makeUnexpected(nuraft::cmd_result_code::SERVER_NOT_FOUND); } repl_service_ctx::repl_service_ctx(grpc_server* server) : m_server(server) {} diff --git a/src/lib/manager_impl.hpp b/src/lib/manager_impl.hpp index 9272996..8ad917a 100644 --- a/src/lib/manager_impl.hpp +++ b/src/lib/manager_impl.hpp @@ -16,6 +16,7 @@ #include #include +#include #include #include #include @@ -24,7 +25,7 @@ #include -#include "mesg_factory.hpp" +#include "nuraft_mesg/mesg_factory.hpp" namespace sisl { class GrpcServer; @@ -39,7 +40,7 @@ class ManagerImpl : public Manager { Manager::Params start_params_; int32_t _srv_id; - std::map< std::string, Manager::group_params > _state_mgr_types; + std::map< group_type_t, Manager::group_params > _state_mgr_types; std::weak_ptr< MessagingApplication > application_; std::shared_ptr< group_factory > _g_factory; @@ -47,19 +48,19 @@ class ManagerImpl : public Manager { std::unique_ptr< ::sisl::GrpcServer > _grpc_server; std::mutex mutable _manager_lock; - std::map< std::string, std::shared_ptr< mesg_state_mgr > > _state_managers; + std::map< group_id_t, std::shared_ptr< mesg_state_mgr > > _state_managers; std::condition_variable _config_change; - std::map< std::string, bool > _is_leader; + std::map< group_id_t, bool > _is_leader; nuraft::ptr< nuraft::delayed_task_scheduler > _scheduler; std::shared_ptr< sisl::logging::logger_t > _custom_logger; - std::error_condition group_init(int32_t const srv_id, std::string const& group_id, std::string const& group_type, - nuraft::context*& ctx, std::shared_ptr< group_metrics > metrics); - nuraft::cb_func::ReturnCode callback_handler(std::string const& group_id, nuraft::cb_func::Type type, + nuraft::cmd_result_code group_init(int32_t const srv_id, group_id_t const& group_id, group_type_t const& group_type, + nuraft::context*& ctx, std::shared_ptr< group_metrics > metrics); + nuraft::cb_func::ReturnCode callback_handler(group_id_t const& group_id, nuraft::cb_func::Type type, nuraft::cb_func::Param* param); - void exit_group(std::string const& group_id); + void exit_group(group_id_t const& group_id); public: ManagerImpl(Manager::Params const&, std::weak_ptr< MessagingApplication >, bool and_data_svc = false); @@ -67,29 +68,30 @@ class ManagerImpl : public Manager { int32_t server_id() const override { return _srv_id; } - void register_mgr_type(std::string const& group_type, group_params const&) override; + void register_mgr_type(group_type_t const& group_type, group_params const&) override; - std::shared_ptr< mesg_state_mgr > lookup_state_manager(std::string const& group_id) const override; + std::shared_ptr< mesg_state_mgr > lookup_state_manager(group_id_t const& group_id) const override; - NullAsyncResult create_group(std::string const& group_id, std::string const& group_type) override; - NullResult join_group(std::string const& group_id, std::string const& group_type, + NullAsyncResult create_group(group_id_t const& group_id, group_type_t const& group_type) override; + NullResult join_group(group_id_t const& group_id, group_type_t const& group_type, std::shared_ptr< mesg_state_mgr > smgr) override; - virtual NullAsyncResult add_member(std::string const& group_id, std::string const& server_id) override; - virtual NullAsyncResult rem_member(std::string const& group_id, std::string const& server_id) override; - virtual NullAsyncResult become_leader(std::string const& group_id) override; - virtual NullAsyncResult client_request(std::string const& group_id, std::shared_ptr< nuraft::buffer >&) override; + virtual NullAsyncResult add_member(group_id_t const& group_id, peer_id_t const& server_id) override; + virtual NullAsyncResult rem_member(group_id_t const& group_id, peer_id_t const& server_id) override; + virtual NullAsyncResult become_leader(group_id_t const& group_id) override; + virtual NullAsyncResult append_entries(group_id_t const& group_id, + std::vector< std::shared_ptr< nuraft::buffer > > const&) override; - void leave_group(std::string const& group_id) override; - uint32_t logstore_id(std::string const& group_id) const override; - void append_peers(std::string const& group_id, std::list< std::string >&) const override; + void leave_group(group_id_t const& group_id) override; + uint32_t logstore_id(group_id_t const& group_id) const override; + void append_peers(group_id_t const& group_id, std::list< peer_id_t >&) const override; void restart_server() override; // data service APIs - bool bind_data_service_request(std::string const& request_name, std::string const& group_id, + bool bind_data_service_request(std::string const& request_name, group_id_t const& group_id, data_service_request_handler_t const& request_handler) override; - void get_srv_config_all(std::string const& group_name, + void get_srv_config_all(group_id_t const& group_name, std::vector< std::shared_ptr< nuraft::srv_config > >& configs_out) override; }; diff --git a/src/lib/mesg_client.cpp b/src/lib/mesg_client.cpp index f949865..bc7e833 100644 --- a/src/lib/mesg_client.cpp +++ b/src/lib/mesg_client.cpp @@ -13,18 +13,17 @@ * *********************************************************************************/ #include "grpc_client.hpp" +#include #include #include -#include "mesg_factory.hpp" +#include "nuraft_mesg/mesg_factory.hpp" #include "service.hpp" #include "proto/messaging_service.grpc.pb.h" #include "utils.hpp" #include "nuraft_mesg_config.hpp" -SISL_LOGGING_DECL(nuraft_mesg) - namespace nuraft_mesg { std::string group_factory::m_ssl_cert; @@ -78,7 +77,7 @@ class messaging_client : public grpc_client< Messaging >, public std::enable_sha [c = copyable_promise](grpc::ByteBuffer& resp, ::grpc::Status& status) mutable { if (!status.ok()) { LOGERRORMOD(nuraft_mesg, "Failed to send data_service_request, error: {}", status.error_message()); - c->setValue(folly::makeUnexpected(std::make_error_condition(std::errc::connection_aborted))); + c->setValue(folly::makeUnexpected(nuraft::cmd_result_code::CANCELLED)); } else { sisl::io_blob svr_buf; deserialize_from_byte_buffer(resp, svr_buf); @@ -96,21 +95,20 @@ class messaging_client : public grpc_client< Messaging >, public std::enable_sha class group_client : public grpc_base_client { std::shared_ptr< messaging_client > _client; - group_name_t const _group_name; + group_id_t const _group_name; group_type_t const _group_type; std::shared_ptr< group_metrics > _metrics; std::string const _client_addr; public: - group_client(std::shared_ptr< messaging_client > client, std::string const& client_addr, - group_name_t const& grp_name, group_type_t const& grp_type, - std::shared_ptr< sisl::MetricsGroupWrapper > metrics) : + group_client(std::shared_ptr< messaging_client > client, peer_id_t const& client_addr, group_id_t const& grp_name, + group_type_t const& grp_type, std::shared_ptr< sisl::MetricsGroupWrapper > metrics) : grpc_base_client(), _client(client), _group_name(grp_name), _group_type(grp_type), _metrics(std::static_pointer_cast< group_metrics >(metrics)), - _client_addr(client_addr) {} + _client_addr(to_string(client_addr)) {} ~group_client() override = default; @@ -125,7 +123,7 @@ class group_client : public grpc_base_client { message.base().dest(), _group_name); if (_metrics) { COUNTER_INCREMENT(*_metrics, group_sends, 1); } group_msg.set_intended_addr(_client_addr); - group_msg.set_group_name(_group_name); + group_msg.set_group_name(to_string(_group_name)); group_msg.set_group_type(_group_type); group_msg.mutable_msg()->CopyFrom(message); _client->send(group_msg, complete); @@ -136,17 +134,17 @@ class group_client : public grpc_base_client { } }; -std::error_condition mesg_factory::create_client(const std::string& client, +std::error_condition mesg_factory::create_client(peer_id_t const& client, nuraft::ptr< nuraft::rpc_client >& raft_client) { // Re-direct this call to a global factory so we can re-use clients to the same endpoints LOGDEBUGMOD(nuraft_mesg, "Creating client to {}", client); - auto m_client = std::dynamic_pointer_cast< messaging_client >(_group_factory->create_client(client)); + auto m_client = std::dynamic_pointer_cast< messaging_client >(_group_factory->create_client(to_string(client))); if (!m_client) { return std::make_error_condition(std::errc::connection_aborted); } raft_client = std::make_shared< group_client >(m_client, client, _group_name, _group_type, _metrics); return (!raft_client) ? std::make_error_condition(std::errc::invalid_argument) : std::error_condition(); } -std::error_condition mesg_factory::reinit_client(const std::string& client, +std::error_condition mesg_factory::reinit_client(peer_id_t const& client, std::shared_ptr< nuraft::rpc_client >& raft_client) { LOGDEBUGMOD(nuraft_mesg, "Re-init client to {}", client); auto g_client = std::dynamic_pointer_cast< group_client >(raft_client); @@ -167,13 +165,13 @@ AsyncResult< sisl::io_blob > mesg_factory::data_service_request(std::string cons return folly::collectAnyWithoutException(calls).deferValue([](auto&& p) { return p.second; }); } -group_factory::group_factory(int const cli_thread_count, std::string const& name, +group_factory::group_factory(int const cli_thread_count, group_id_t const& name, std::shared_ptr< sisl::GrpcTokenClient > const token_client, std::string const& ssl_cert) : - grpc_factory(cli_thread_count, name), m_token_client(token_client) { + grpc_factory(cli_thread_count, to_string(name)), m_token_client(token_client) { m_ssl_cert = ssl_cert; } -std::error_condition group_factory::create_client(const std::string& client, +std::error_condition group_factory::create_client(peer_id_t const& client, nuraft::ptr< nuraft::rpc_client >& raft_client) { LOGDEBUGMOD(nuraft_mesg, "Creating client to {}", client); auto endpoint = lookupEndpoint(client); @@ -185,7 +183,7 @@ std::error_condition group_factory::create_client(const std::string& client, return (!raft_client) ? std::make_error_condition(std::errc::connection_aborted) : std::error_condition(); } -std::error_condition group_factory::reinit_client(const std::string& client, +std::error_condition group_factory::reinit_client(peer_id_t const& client, std::shared_ptr< nuraft::rpc_client >& raft_client) { LOGDEBUGMOD(nuraft_mesg, "Re-init client to {}", client); assert(raft_client); diff --git a/src/lib/service.cpp b/src/lib/service.cpp index 0736ea3..a13be15 100644 --- a/src/lib/service.cpp +++ b/src/lib/service.cpp @@ -1,43 +1,52 @@ -/// -// Copyright 2018 (c) eBay Corporation -// -// Authors: -// Brian Szmyd -// -// Brief: -// Messaging service routines -// - +#include +#include +#include +#include +#include #include -#include "grpcpp/impl/codegen/status_code_enum.h" -#include "libnuraft/async.hxx" -#include "libnuraft/rpc_listener.hxx" #include "service.hpp" -#include "mesg_factory.hpp" -#include "mesg_service.hpp" - -SISL_LOGGING_DECL(nuraft_mesg) +#include "nuraft_mesg/mesg_factory.hpp" +#include "nuraft_mesg/nuraft_mesg.hpp" SISL_OPTION_GROUP(nuraft_mesg, (messaging_metrics, "", "msg_metrics", "Gather metrics from SD Messaging", cxxopts::value< bool >(), "")) +#define CONTINUE_RESP(resp) \ + try { \ + if (auto r = (resp)->get_result_code(); r != nuraft::RESULT_NOT_EXIST_YET) { \ + if (nuraft::OK == r) return folly::Unit(); \ + return folly::makeUnexpected(r); \ + } \ + auto [p, sf] = folly::makePromiseContract< NullResult >(); \ + (resp)->when_ready( \ + [p = std::make_shared< decltype(p) >(std::move(p))]( \ + nuraft::cmd_result< nuraft::ptr< nuraft::buffer >, nuraft::ptr< std::exception > >& result, \ + auto& e) mutable { \ + if (nuraft::cmd_result_code::OK != result.get_result_code()) \ + p->setValue(folly::makeUnexpected(result.get_result_code())); \ + else \ + p->setValue(folly::Unit()); \ + }); \ + return std::move(sf); \ + } catch (std::runtime_error & rte) { LOGERRORMOD(nuraft_mesg, "Caught exception during rm_srv(): {}", rte.what()); } + namespace nuraft_mesg { using AsyncRaftSvc = Messaging::AsyncService; -grpc_server_wrapper::grpc_server_wrapper(group_name_t const& group_name) { +grpc_server_wrapper::grpc_server_wrapper(group_id_t const& group_name) { if (0 < SISL_OPTIONS.count("msg_metrics")) m_metrics = std::make_shared< group_metrics >(group_name); } -msg_service::msg_service(get_server_ctx_cb get_server_ctx, std::string const& service_address, +msg_service::msg_service(get_server_ctx_cb get_server_ctx, group_id_t const& service_address, bool const enable_data_service) : _get_server_ctx(get_server_ctx), _service_address(service_address), _data_service_enabled(enable_data_service) {} -std::shared_ptr< msg_service > msg_service::create(get_server_ctx_cb get_server_ctx, std::string const& service_address, +std::shared_ptr< msg_service > msg_service::create(get_server_ctx_cb get_server_ctx, group_id_t const& service_address, bool const enable_data_service) { return std::shared_ptr< msg_service >(new msg_service(get_server_ctx, service_address, enable_data_service), [](msg_service* p) { delete p; }); @@ -71,7 +80,7 @@ void msg_service::bind(::sisl::GrpcServer* server) { if (_data_service_enabled) { _data_service.bind(); } } -bool msg_service::bind_data_service_request(std::string const& request_name, std::string const& group_id, +bool msg_service::bind_data_service_request(std::string const& request_name, group_id_t const& group_id, data_service_request_handler_t const& request_handler) { if (!_data_service_enabled) { LOGERRORMOD(nuraft_mesg, "Could not register data service method {}; data service is null", request_name); @@ -80,54 +89,27 @@ bool msg_service::bind_data_service_request(std::string const& request_name, std return _data_service.bind(request_name, group_id, request_handler); } -AsyncResult< nuraft::cmd_result_code > msg_service::add_srv(group_name_t const& group_name, - nuraft::srv_config const& cfg) { +NullAsyncResult msg_service::add_srv(group_id_t const& group_name, nuraft::srv_config const& cfg) { std::shared_ptr< grpc_server > server; { std::shared_lock< lock_type > rl(_raft_servers_lock); if (auto it = _raft_servers.find(group_name); _raft_servers.end() != it) { server = it->second.m_server; } } - if (server) { - try { - auto res_p = server->add_srv(cfg); - if (auto r = res_p->get_result_code(); r != nuraft::RESULT_NOT_EXIST_YET) return r; - auto [p, sf] = folly::makePromiseContract< Result< nuraft::cmd_result_code > >(); - res_p->when_ready( - [p = std::make_shared< decltype(p) >(std::move(p))]( - nuraft::cmd_result< nuraft::ptr< nuraft::buffer >, nuraft::ptr< std::exception > >& result, - auto& e) mutable { p->setValue(result.get_result_code()); }); - return std::move(sf); - } catch (std::runtime_error& rte) { - LOGERRORMOD(nuraft_mesg, "Caught exception during add_srv(): {}", rte.what()); - } - } - return nuraft::SERVER_NOT_FOUND; + if (server) { CONTINUE_RESP(server->add_srv(cfg)) } + return folly::makeUnexpected(nuraft::SERVER_NOT_FOUND); } -AsyncResult< nuraft::cmd_result_code > msg_service::rm_srv(group_name_t const& group_name, int const member_id) { +NullAsyncResult msg_service::rm_srv(group_id_t const& group_name, int const member_id) { std::shared_ptr< grpc_server > server; { std::shared_lock< lock_type > rl(_raft_servers_lock); if (auto it = _raft_servers.find(group_name); _raft_servers.end() != it) { server = it->second.m_server; } } - if (server) { - try { - auto res_p = server->rem_srv(member_id); - if (auto r = res_p->get_result_code(); r != nuraft::RESULT_NOT_EXIST_YET) return r; - auto [p, sf] = folly::makePromiseContract< Result< nuraft::cmd_result_code > >(); - res_p->when_ready( - [p = std::make_shared< decltype(p) >(std::move(p))]( - nuraft::cmd_result< nuraft::ptr< nuraft::buffer >, nuraft::ptr< std::exception > >& result, - auto& e) mutable { p->setValue(result.get_result_code()); }); - return std::move(sf); - } catch (std::runtime_error& rte) { - LOGERRORMOD(nuraft_mesg, "Caught exception during rm_srv(): {}", rte.what()); - } - } - return nuraft::SERVER_NOT_FOUND; + if (server) { CONTINUE_RESP(server->rem_srv(member_id)) } + return folly::makeUnexpected(nuraft::SERVER_NOT_FOUND); } -bool msg_service::request_leadership(group_name_t const& group_name) { +bool msg_service::request_leadership(group_id_t const& group_name) { std::shared_ptr< grpc_server > server; { std::shared_lock< lock_type > rl(_raft_servers_lock); @@ -143,7 +125,7 @@ bool msg_service::request_leadership(group_name_t const& group_name) { return false; } -void msg_service::get_srv_config_all(group_name_t const& group_name, +void msg_service::get_srv_config_all(group_id_t const& group_name, std::vector< std::shared_ptr< nuraft::srv_config > >& configs_out) { std::shared_ptr< grpc_server > server; @@ -161,28 +143,15 @@ void msg_service::get_srv_config_all(group_name_t const& group_name, } } -AsyncResult< nuraft::cmd_result_code > -msg_service::append_entries(group_name_t const& group_name, std::vector< nuraft::ptr< nuraft::buffer > > const& logs) { +NullAsyncResult msg_service::append_entries(group_id_t const& group_name, + std::vector< nuraft::ptr< nuraft::buffer > > const& logs) { std::shared_ptr< grpc_server > server; { std::shared_lock< lock_type > rl(_raft_servers_lock); if (auto it = _raft_servers.find(group_name); _raft_servers.end() != it) { server = it->second.m_server; } } - if (server) { - try { - auto res_p = server->append_entries(logs); - if (auto r = res_p->get_result_code(); r != nuraft::RESULT_NOT_EXIST_YET) return r; - auto [p, sf] = folly::makePromiseContract< Result< nuraft::cmd_result_code > >(); - auto sp = std::make_shared< decltype(p) >(std::move(p)); - res_p->when_ready( - [sp](nuraft::cmd_result< nuraft::ptr< nuraft::buffer >, nuraft::ptr< std::exception > >& result, - auto& e) mutable { sp->setValue(result.get_result_code()); }); - return std::move(sf); - } catch (std::runtime_error& rte) { - LOGERRORMOD(nuraft_mesg, "Caught exception during step(): {}", rte.what()); - } - } - return nuraft::SERVER_NOT_FOUND; + if (server) { CONTINUE_RESP(server->append_entries(logs)) } + return folly::makeUnexpected(nuraft::SERVER_NOT_FOUND); } void msg_service::setDefaultGroupType(std::string const& _type) { @@ -196,9 +165,21 @@ bool msg_service::raftStep(const sisl::AsyncRpcDataPtr< Messaging, RaftGroupMsg, auto const& group_name = request.group_name(); auto const& intended_addr = request.intended_addr(); + auto gid = boost::uuids::uuid(); + auto sid = boost::uuids::uuid(); + try { + gid = boost::uuids::string_generator()(group_name); + sid = boost::uuids::string_generator()(intended_addr); + } catch (std::runtime_error const& e) { + LOGWARNMOD(nuraft_mesg, "Recieved mesg for {}:{} which is not a valid UUID!", group_name, intended_addr); + rpc_data->set_status( + ::grpc::Status(::grpc::INVALID_ARGUMENT, fmt::format(FMT_STRING("Bad GroupID {}"), group_name))); + return true; + } + // Verify this is for the service it was intended for auto const& base = request.msg().base(); - if (intended_addr != _service_address) { + if (sid != _service_address) { LOGWARNMOD(nuraft_mesg, "Recieved mesg for {} intended for {}, we are {}", nuraft::msg_type_to_string(nuraft::msg_type(base.type())), intended_addr, _service_address); rpc_data->set_status(::grpc::Status( @@ -213,13 +194,13 @@ bool msg_service::raftStep(const sisl::AsyncRpcDataPtr< Messaging, RaftGroupMsg, // JoinClusterRequests are expected to be received upon Cluster creation by the current leader. We need // to initialize a RaftServer context based on the corresponding type prior to servicing this request. This // should emplace a corresponding server in the _raft_servers member. - if (nuraft::join_cluster_request == base.type()) { joinRaftGroup(base.dest(), group_name, request.group_type()); } + if (nuraft::join_cluster_request == base.type()) { joinRaftGroup(base.dest(), gid, request.group_type()); } // Find the RaftServer context based on the name of the group. std::shared_ptr< grpc_server > server; { std::shared_lock< lock_type > rl(_raft_servers_lock); - if (auto it = _raft_servers.find(group_name); _raft_servers.end() != it) { + if (auto it = _raft_servers.find(gid); _raft_servers.end() != it) { if (it->second.m_metrics) COUNTER_INCREMENT(*it->second.m_metrics, group_steps, 1); server = it->second.m_server; } @@ -265,10 +246,10 @@ class null_service final : public grpc_server { class msg_group_listner : public nuraft::rpc_listener { std::shared_ptr< msg_service > _svc; - group_name_t _group; + group_id_t _group; public: - msg_group_listner(std::shared_ptr< msg_service > svc, group_name_t const& group) : _svc(svc), _group(group) {} + msg_group_listner(std::shared_ptr< msg_service > svc, group_id_t const& group) : _svc(svc), _group(group) {} ~msg_group_listner() { _svc->shutdown_for(_group); } void listen(nuraft::ptr< nuraft::msg_handler >& handler) override { @@ -278,7 +259,7 @@ class msg_group_listner : public nuraft::rpc_listener { void shutdown() override { LOGINFOMOD(nuraft_mesg, "Shutdown {}", _group); } }; -void msg_service::shutdown_for(group_name_t const& group_name) { +void msg_service::shutdown_for(group_id_t const& group_name) { { std::unique_lock< lock_type > lck(_raft_servers_lock); LOGDEBUGMOD(nuraft_mesg, "Shutting down RAFT group: {}", group_name); @@ -292,8 +273,8 @@ void msg_service::shutdown_for(group_name_t const& group_name) { _raft_servers_sync.notify_all(); } -std::error_condition msg_service::joinRaftGroup(int32_t const srv_id, group_name_t const& group_name, - group_type_t const& group_type) { +nuraft::cmd_result_code msg_service::joinRaftGroup(int32_t const srv_id, group_id_t const& group_name, + group_type_t const& group_type) { LOGINFOMOD(nuraft_mesg, "Joining RAFT group: {}, type: {}", group_name, group_type); nuraft::context* ctx{nullptr}; @@ -307,8 +288,7 @@ std::error_condition msg_service::joinRaftGroup(int32_t const srv_id, group_name std::tie(it, happened) = _raft_servers.emplace(std::make_pair(group_name, group_name)); if (_raft_servers.end() != it && happened) { if (auto err = _get_server_ctx(srv_id, group_name, g_type, ctx, it->second.m_metrics); err) { - LOGERRORMOD(nuraft_mesg, "Error during RAFT server creation on group {}: {}", group_name, - err.message()); + LOGERRORMOD(nuraft_mesg, "Error during RAFT server creation on group {}: {}", group_name, err); return err; } DEBUG_ASSERT(!ctx->rpc_listener_, "RPC listner should not be set!"); @@ -323,10 +303,10 @@ std::error_condition msg_service::joinRaftGroup(int32_t const srv_id, group_name } } } - return std::error_condition(); + return nuraft::cmd_result_code::OK; } -void msg_service::partRaftGroup(group_name_t const& group_name) { +void msg_service::partRaftGroup(group_id_t const& group_name) { std::shared_ptr< grpc_server > server; { diff --git a/src/lib/service.hpp b/src/lib/service.hpp index 5df730b..56180be 100644 --- a/src/lib/service.hpp +++ b/src/lib/service.hpp @@ -5,12 +5,14 @@ #pragma once #include +#include #include #include -#include "grpc_server.hpp" #include #include +#include "nuraft_mesg/grpc_server.hpp" + #include "proto/messaging_service.grpc.pb.h" #include "manager_impl.hpp" #include "data_service_grpc.hpp" @@ -20,9 +22,6 @@ namespace nuraft_mesg { template < typename T > using shared = std::shared_ptr< T >; -using group_name_t = std::string; -using group_type_t = std::string; - class msg_service; class mesg_factory; class repl_service_ctx_grpc; @@ -31,8 +30,8 @@ using lock_type = folly::SharedMutex; class group_metrics : public sisl::MetricsGroupWrapper { public: - explicit group_metrics(group_name_t const& group_name) : - sisl::MetricsGroupWrapper("RAFTGroup", group_name.c_str()) { + explicit group_metrics(group_id_t const& group_name) : + sisl::MetricsGroupWrapper("RAFTGroup", to_string(group_name).c_str()) { REGISTER_COUNTER(group_steps, "Total group messages received", "raft_group", {"op", "step"}); REGISTER_COUNTER(group_sends, "Total group messages sent", "raft_group", {"op", "send"}); register_me_to_farm(); @@ -42,14 +41,14 @@ class group_metrics : public sisl::MetricsGroupWrapper { }; using get_server_ctx_cb = - std::function< std::error_condition(int32_t srv_id, group_name_t const&, group_type_t const&, - nuraft::context*& ctx_out, std::shared_ptr< group_metrics > metrics) >; + std::function< nuraft::cmd_result_code(int32_t srv_id, group_id_t const&, group_type_t const&, + nuraft::context*& ctx_out, std::shared_ptr< group_metrics > metrics) >; // pluggable type for data service using data_service_t = data_service_grpc; struct grpc_server_wrapper { - explicit grpc_server_wrapper(group_name_t const& group_name); + explicit grpc_server_wrapper(group_id_t const& group_name); std::shared_ptr< grpc_server > m_server; std::shared_ptr< group_metrics > m_metrics; @@ -60,17 +59,17 @@ class msg_service : public std::enable_shared_from_this< msg_service > { std::mutex _raft_sync_lock; std::condition_variable_any _raft_servers_sync; lock_type _raft_servers_lock; - std::map< group_name_t, grpc_server_wrapper > _raft_servers; - std::string const _service_address; + std::map< group_id_t, grpc_server_wrapper > _raft_servers; + peer_id_t const _service_address; std::string _default_group_type; data_service_t _data_service; bool _data_service_enabled; - msg_service(get_server_ctx_cb get_server_ctx, std::string const& service_address, bool const enable_data_service); + msg_service(get_server_ctx_cb get_server_ctx, peer_id_t const& service_address, bool const enable_data_service); ~msg_service(); public: - static std::shared_ptr< msg_service > create(get_server_ctx_cb get_server_ctx, std::string const& service_address, + static std::shared_ptr< msg_service > create(get_server_ctx_cb get_server_ctx, peer_id_t const& service_address, bool const enable_data_service); msg_service(msg_service const&) = delete; @@ -78,18 +77,18 @@ class msg_service : public std::enable_shared_from_this< msg_service > { void shutdown(); - AsyncResult< nuraft::cmd_result_code > add_srv(group_name_t const& group_name, nuraft::srv_config const& cfg); - AsyncResult< nuraft::cmd_result_code > append_entries(group_name_t const& group_name, - std::vector< nuraft::ptr< nuraft::buffer > > const& logs); - AsyncResult< nuraft::cmd_result_code > rm_srv(group_name_t const& group_name, int const member_id); + NullAsyncResult add_srv(group_id_t const& group_name, nuraft::srv_config const& cfg); + NullAsyncResult append_entries(group_id_t const& group_name, + std::vector< nuraft::ptr< nuraft::buffer > > const& logs); + NullAsyncResult rm_srv(group_id_t const& group_name, int const member_id); - bool request_leadership(group_name_t const& group_name); - void get_srv_config_all(group_name_t const& group_name, + bool request_leadership(group_id_t const& group_name); + void get_srv_config_all(group_id_t const& group_name, std::vector< std::shared_ptr< nuraft::srv_config > >& configs_out); void associate(sisl::GrpcServer* server); void bind(sisl::GrpcServer* server); - bool bind_data_service_request(std::string const& request_name, std::string const& group_id, + bool bind_data_service_request(std::string const& request_name, group_id_t const& group_id, data_service_request_handler_t const& request_handler); //::grpc::Status raftStep(RaftGroupMsg& request, RaftGroupMsg& response); @@ -97,17 +96,17 @@ class msg_service : public std::enable_shared_from_this< msg_service > { void setDefaultGroupType(std::string const& _type); - std::error_condition createRaftGroup(int const srv_id, group_name_t const& group_name, - group_type_t const& group_type) { + nuraft::cmd_result_code createRaftGroup(int const srv_id, group_id_t const& group_name, + group_type_t const& group_type) { return joinRaftGroup(srv_id, group_name, group_type); } - std::error_condition joinRaftGroup(int32_t srv_id, group_name_t const& group_name, group_type_t const&); + nuraft::cmd_result_code joinRaftGroup(int32_t srv_id, group_id_t const& group_name, group_type_t const&); - void partRaftGroup(group_name_t const& group_name); + void partRaftGroup(group_id_t const& group_name); // Internal intent only - void shutdown_for(group_name_t const&); + void shutdown_for(group_id_t const&); }; } // namespace nuraft_mesg diff --git a/src/lib/utils.hpp b/src/lib/utils.hpp index e48677a..d8a2291 100644 --- a/src/lib/utils.hpp +++ b/src/lib/utils.hpp @@ -18,7 +18,7 @@ // #pragma once -#include "mesg_factory.hpp" +#include "nuraft_mesg/mesg_factory.hpp" #include "proto/raft_types.pb.h" namespace nuraft_mesg { @@ -53,7 +53,7 @@ static grpc::Status deserialize_from_byte_buffer(grpc::ByteBuffer const& cli_byt // generic rpc server looks up rpc name in a map and calls the corresponding callback. To avoid another lookup in this // layer, we registed one callback for each (group_id, request_name) pair. The rpc_name is their concatenation. -static std::string get_generic_method_name(std::string const& request_name, std::string const& group_id) { +static std::string get_generic_method_name(std::string const& request_name, group_id_t const& group_id) { return fmt::format("{}|{}", request_name, group_id); } diff --git a/src/tests/MessagingTest.cpp b/src/tests/MessagingTest.cpp index d60ab67..1d765b2 100644 --- a/src/tests/MessagingTest.cpp +++ b/src/tests/MessagingTest.cpp @@ -18,7 +18,6 @@ #include #include #include -#include #include #include #include @@ -31,7 +30,8 @@ #include "libnuraft/cluster_config.hxx" #include "libnuraft/state_machine.hxx" -#include "mesg_factory.hpp" +#include "nuraft_mesg/common.hpp" +#include "nuraft_mesg/mesg_factory.hpp" #include "test_state_manager.h" #include @@ -51,27 +51,26 @@ class TestApplication : public MessagingApplication, public std::enable_shared_f public: std::string name_; uint32_t port_; - std::string id_; + boost::uuids::uuid id_; std::shared_ptr< Manager > instance_; TestApplication(std::string const& name, uint32_t port) : name_(name), port_(port) { - id_ = to_string(boost::uuids::random_generator()()); + id_ = boost::uuids::random_generator()(); } ~TestApplication() override = default; - void set_id(std::string const& id) { id_ = id; } - std::string lookup_peer(std::string const& peer) override { + void set_id(boost::uuids::uuid const& id) { id_ = id; } + + std::string lookup_peer(peer_id_t const& peer) override { auto lg = std::scoped_lock(lookup_lock_); return (lookup_map_.count(peer) > 0) ? lookup_map_[peer] : std::string(); } - std::shared_ptr< mesg_state_mgr > create_state_mgr(int32_t const srv_id, std::string const& group_id) override { - auto [it, happened] = state_mgr_map.emplace( - std::make_pair(group_id + "_" + name_, std::make_shared< test_state_mgr >(srv_id, id_, group_id))); - return std::static_pointer_cast< mesg_state_mgr >(it->second); + std::shared_ptr< mesg_state_mgr > create_state_mgr(int32_t const srv_id, group_id_t const& group_id) override { + return std::static_pointer_cast< mesg_state_mgr >(std::make_shared< test_state_mgr >(srv_id, id_, group_id)); } - void map_peers(std::map< std::string, std::string > const& peers) { + void map_peers(std::map< nuraft_mesg::peer_id_t, std::string > const& peers) { auto lg = std::scoped_lock(lookup_lock_); lookup_map_ = peers; } @@ -96,9 +95,7 @@ class TestApplication : public MessagingApplication, public std::enable_shared_f private: std::mutex lookup_lock_; - std::map< std::string, std::string > lookup_map_; - - std::map< std::string, std::shared_ptr< test_state_mgr > > state_mgr_map; + std::map< nuraft_mesg::peer_id_t, std::string > lookup_map_; }; extern nuraft::ptr< nuraft::cluster_config > fromClusterConfig(nlohmann::json const& cluster_config); @@ -125,10 +122,9 @@ class MessagingFixtureBase : public ::testing::Test { std::shared_ptr< TestApplication > app_3_; std::vector< uint32_t > ports; - std::map< std::string, std::string > lookup_map; + std::map< nuraft_mesg::peer_id_t, std::string > lookup_map; - // Store state mgrs for each instance and group. key is "group_id" + "_sm{instance_number}". - std::map< std::string, std::pair< std::shared_ptr< test_state_mgr >, Manager* > > state_mgr_map; + group_id_t group_id_; void get_random_ports(const uint16_t n) { auto cur_size = ports.size(); @@ -160,14 +156,16 @@ class MessagingFixtureBase : public ::testing::Test { app_2_->start(data_svc_enabled); app_3_->start(data_svc_enabled); - EXPECT_TRUE(!!app_1_->instance_->create_group("test_group", "test_type").get()); + group_id_ = boost::uuids::random_generator()(); + + EXPECT_TRUE(!!app_1_->instance_->create_group(group_id_, "test_type").get()); std::this_thread::sleep_for(std::chrono::seconds(1)); - auto add2 = app_1_->instance_->add_member("test_group", app_2_->id_); + auto add2 = app_1_->instance_->add_member(group_id_, app_2_->id_); std::this_thread::sleep_for(std::chrono::seconds(1)); EXPECT_TRUE(std::move(add2).get()); - auto add3 = app_1_->instance_->add_member("test_group", app_3_->id_); + auto add3 = app_1_->instance_->add_member(group_id_, app_3_->id_); std::this_thread::sleep_for(std::chrono::seconds(1)); EXPECT_TRUE(std::move(add3).get()); } @@ -186,11 +184,11 @@ TEST_F(MessagingFixture, ClientRequest) { auto buf = nuraft_mesg::create_message(nlohmann::json{ {"op_type", 2}, }); - EXPECT_TRUE(!!app_1_->instance_->client_request("test_group", buf).get()); + EXPECT_TRUE(!!app_1_->instance_->append_entries(group_id_, {buf}).get()); - app_3_->instance_->leave_group("test_group"); - app_2_->instance_->leave_group("test_group"); - app_1_->instance_->leave_group("test_group"); + app_3_->instance_->leave_group(group_id_); + app_2_->instance_->leave_group(group_id_); + app_1_->instance_->leave_group(group_id_); } // Basic resiliency test (append_entries) @@ -203,7 +201,7 @@ TEST_F(MessagingFixture, ClientReset) { auto buf = nuraft_mesg::create_message(nlohmann::json{ {"op_type", 2}, }); - EXPECT_TRUE(!!app_1_->instance_->client_request("test_group", buf).get()); + EXPECT_TRUE(!!app_1_->instance_->append_entries(group_id_, {buf}).get()); app_3_ = std::make_shared< TestApplication >("sm3", ports[2]); app_3_->set_id(our_id); @@ -211,47 +209,47 @@ TEST_F(MessagingFixture, ClientReset) { app_3_->start(); std::this_thread::sleep_for(std::chrono::seconds(5)); - auto err = app_3_->instance_->client_request("test_group", buf).get(); - if (!!err) { LOGERROR("Failed to commit: {}", err.error().message()); } + auto err = app_3_->instance_->append_entries(group_id_, {buf}).get(); + if (!!err) { LOGERROR("Failed to commit: {}", err.error()); } EXPECT_FALSE(err); - app_3_->instance_->leave_group("test_group"); - app_2_->instance_->leave_group("test_group"); - app_1_->instance_->leave_group("test_group"); + app_3_->instance_->leave_group(group_id_); + app_2_->instance_->leave_group(group_id_); + app_1_->instance_->leave_group(group_id_); } // Test sending a message for a group the messaging service is not aware of. TEST_F(MessagingFixture, UnknownGroup) { - auto add = app_1_->instance_->add_member("unknown_group", to_string(boost::uuids::random_generator()())); + auto add = app_1_->instance_->add_member(boost::uuids::random_generator()(), boost::uuids::random_generator()()); std::this_thread::sleep_for(std::chrono::seconds(1)); EXPECT_FALSE(std::move(add).get()); - app_1_->instance_->leave_group("unknown_group"); + app_1_->instance_->leave_group(boost::uuids::random_generator()()); auto buf = nuraft_mesg::create_message(nlohmann::json{ {"op_type", 2}, }); - EXPECT_FALSE(app_1_->instance_->client_request("unknown_group", buf).get()); + EXPECT_FALSE(app_1_->instance_->append_entries(boost::uuids::random_generator()(), {buf}).get()); } TEST_F(MessagingFixture, RemoveMember) { - EXPECT_TRUE(app_1_->instance_->rem_member("test_group", app_3_->id_).get()); + EXPECT_TRUE(app_1_->instance_->rem_member(group_id_, app_3_->id_).get()); auto buf = nuraft_mesg::create_message(nlohmann::json{ {"op_type", 2}, }); - EXPECT_TRUE(!!app_1_->instance_->client_request("test_group", buf).get()); + EXPECT_TRUE(!!app_1_->instance_->append_entries(group_id_, {buf}).get()); } TEST_F(MessagingFixture, SyncAddMember) { std::vector< std::shared_ptr< nuraft::srv_config > > srv_list; - app_1_->instance_->get_srv_config_all("test_group", srv_list); + app_1_->instance_->get_srv_config_all(group_id_, srv_list); EXPECT_EQ(srv_list.size(), 3u); srv_list.clear(); - app_2_->instance_->get_srv_config_all("test_group", srv_list); + app_2_->instance_->get_srv_config_all(group_id_, srv_list); EXPECT_EQ(srv_list.size(), 3u); srv_list.clear(); - app_3_->instance_->get_srv_config_all("test_group", srv_list); + app_3_->instance_->get_srv_config_all(group_id_, srv_list); EXPECT_EQ(srv_list.size(), 3u); get_random_ports(1u); @@ -262,12 +260,12 @@ TEST_F(MessagingFixture, SyncAddMember) { app_3_->map_peers(lookup_map); app_4->map_peers(lookup_map); app_4->start(); - auto add = app_1_->instance_->add_member("test_group", app_4->id_); + auto add = app_1_->instance_->add_member(group_id_, app_4->id_); std::this_thread::sleep_for(std::chrono::seconds(1)); EXPECT_TRUE(std::move(add).get()); srv_list.clear(); - app_1_->instance_->get_srv_config_all("test_group", srv_list); + app_1_->instance_->get_srv_config_all(group_id_, srv_list); EXPECT_EQ(srv_list.size(), 4u); } diff --git a/src/tests/test_state_manager.cpp b/src/tests/test_state_manager.cpp index 6557c9a..1244e46 100644 --- a/src/tests/test_state_manager.cpp +++ b/src/tests/test_state_manager.cpp @@ -21,6 +21,7 @@ #include #include +#include "nuraft_mesg/common.hpp" #include "test_state_machine.h" #include #include @@ -46,12 +47,12 @@ std::error_condition jsonObjectFromFile(std::string const& filename, json& json_ return std::error_condition(); } -std::error_condition loadConfigFile(json& config_map, std::string const& _group_id, int32_t const _srv_id) { +std::error_condition loadConfigFile(json& config_map, nuraft_mesg::group_id_t const& _group_id, int32_t const _srv_id) { auto const config_file = fmt::format(FMT_STRING("{}_s{}/config.json"), _group_id, _srv_id); return jsonObjectFromFile(config_file, config_map); } -std::error_condition loadStateFile(json& state_map, std::string const& _group_id, int32_t const _srv_id) { +std::error_condition loadStateFile(json& state_map, nuraft_mesg::group_id_t const& _group_id, int32_t const _srv_id) { auto const state_file = fmt::format(FMT_STRING("{}_s{}/state.json"), _group_id, _srv_id); return jsonObjectFromFile(state_file, state_map); } @@ -95,11 +96,12 @@ nuraft::ptr< nuraft::cluster_config > fromClusterConfig(json const& cluster_conf return raft_config; } -test_state_mgr::test_state_mgr(int32_t srv_id, std::string const& srv_addr, std::string const& group_id) : +test_state_mgr::test_state_mgr(int32_t srv_id, nuraft_mesg::peer_id_t const& srv_addr, + nuraft_mesg::group_id_t const& group_id) : nuraft_mesg::mesg_state_mgr(), _srv_id(srv_id), _srv_addr(srv_addr), - _group_id(group_id.c_str()), + _group_id(group_id), _state_machine(std::make_shared< test_state_machine >()) {} nuraft::ptr< nuraft::cluster_config > test_state_mgr::load_config() { @@ -107,7 +109,7 @@ nuraft::ptr< nuraft::cluster_config > test_state_mgr::load_config() { json config_map; if (auto err = loadConfigFile(config_map, _group_id, _srv_id); !err) { return fromClusterConfig(config_map); } auto conf = nuraft::cs_new< nuraft::cluster_config >(); - conf->get_servers().push_back(nuraft::cs_new< nuraft::srv_config >(_srv_id, _srv_addr)); + conf->get_servers().push_back(nuraft::cs_new< nuraft::srv_config >(_srv_id, to_string(_srv_addr))); return conf; } diff --git a/src/tests/test_state_manager.h b/src/tests/test_state_manager.h index ca2bb52..7d464f4 100644 --- a/src/tests/test_state_manager.h +++ b/src/tests/test_state_manager.h @@ -14,7 +14,7 @@ *********************************************************************************/ #pragma once -#include "mesg_service.hpp" +#include "nuraft_mesg/nuraft_mesg.hpp" #include class test_state_machine; @@ -25,7 +25,7 @@ class service; class test_state_mgr : public nuraft_mesg::mesg_state_mgr { public: - test_state_mgr(int32_t srv_id, std::string const& srv_addr, std::string const& group_id); + test_state_mgr(int32_t srv_id, nuraft_mesg::peer_id_t const& srv_addr, nuraft_mesg::group_id_t const& group_id); ~test_state_mgr() override = default; nuraft::ptr< nuraft::cluster_config > load_config() override; @@ -55,8 +55,8 @@ class test_state_mgr : public nuraft_mesg::mesg_state_mgr { private: private: int32_t const _srv_id; - std::string const _srv_addr; - std::string const _group_id; + nuraft_mesg::peer_id_t const _srv_addr; + nuraft_mesg::group_id_t const _group_id; std::shared_ptr< test_state_machine > _state_machine; inline static std::atomic< uint32_t > server_counter{0}; diff --git a/test_package/example_server.cpp b/test_package/example_server.cpp index a3de404..4df35a9 100644 --- a/test_package/example_server.cpp +++ b/test_package/example_server.cpp @@ -1,7 +1,7 @@ #include #include -#include +#include #include #include #include