diff --git a/src/include/mesg_factory.hpp b/src/include/mesg_factory.hpp index 954ac2e..cb39a15 100644 --- a/src/include/mesg_factory.hpp +++ b/src/include/mesg_factory.hpp @@ -33,7 +33,21 @@ namespace nuraft_mesg { using group_name_t = std::string; using group_type_t = std::string; -class group_factory; +class group_factory : public grpc_factory { + std::shared_ptr< sisl::GrpcTokenClient > m_token_client; + static std::string m_ssl_cert; + +public: + group_factory(int const cli_thread_count, std::string const& name, + std::shared_ptr< sisl::GrpcTokenClient > const token_client, std::string const& ssl_cert = ""); + + using grpc_factory::create_client; + std::error_condition create_client(const std::string& client, nuraft::ptr< nuraft::rpc_client >&) override; + std::error_condition reinit_client(std::string const& client, + std::shared_ptr< nuraft::rpc_client >& raft_client) override; + + virtual std::string lookupEndpoint(std::string const& client) = 0; +}; class mesg_factory final : public grpc_factory { std::shared_ptr< group_factory > _group_factory; diff --git a/src/include/mesg_service.hpp b/src/include/mesg_service.hpp index 48f6bde..a7eed6d 100644 --- a/src/include/mesg_service.hpp +++ b/src/include/mesg_service.hpp @@ -18,15 +18,12 @@ #include #include #include -#include -#include -#include -#include #include -#include #include +#include "mesg_state_mgr.hpp" + SISL_LOGGING_DECL(nuraft) namespace grpc { @@ -37,57 +34,18 @@ class Status; namespace sisl { class GrpcTokenVerifier; class GrpcTokenClient; -class GenericRpcData; using generic_unary_callback_t = std::function< void(grpc::ByteBuffer&, ::grpc::Status& status) >; } // namespace sisl -namespace boost { -template < class T > -class intrusive_ptr; -} // namespace boost - namespace nuraft_mesg { -using io_blob_list_t = folly::small_vector< sisl::io_blob, 4 >; - // called by the server after it receives the request using data_service_request_handler_t = std::function< void(sisl::io_blob const& incoming_buf, boost::intrusive_ptr< sisl::GenericRpcData >& rpc_data) >; -// This object can be stored by the caller and can be used to directly call raft/data operatons without taking -// _raft_servers_lock -class grpc_server; - -template < typename T > -using Result = folly::Expected< T, std::error_condition >; -template < typename T > -using AsyncResult = folly::SemiFuture< Result< T > >; - using NullResult = Result< folly::Unit >; using NullAsyncResult = AsyncResult< folly::Unit >; -class repl_service_ctx { -public: - repl_service_ctx(grpc_server* server); - virtual ~repl_service_ctx() = default; - - // we do not own this pointer. Use this only if the lyfe cycle of the pointer is well known - grpc_server* m_server; - bool is_raft_leader() const; - - // data service api client call - virtual AsyncResult< sisl::io_blob > data_service_request(std::string const& request_name, - io_blob_list_t const& cli_buf) = 0; - - // Send response to a data service request and finish the async call. - virtual void send_data_service_response(io_blob_list_t const& outgoing_buf, - boost::intrusive_ptr< sisl::GenericRpcData >& rpc_data) = 0; -}; - -extern int32_t to_server_id(std::string const& server_addr); - -class mesg_state_mgr; - class MessagingApplication { public: virtual ~MessagingApplication() = default; @@ -134,6 +92,8 @@ class Manager { data_service_request_handler_t const&) = 0; }; +extern int32_t to_server_id(std::string const& server_addr); + extern std::shared_ptr< Manager > init_messaging(Manager::Params const&, std::weak_ptr< MessagingApplication >, bool with_data_svc = false); diff --git a/src/include/mesg_state_mgr.hpp b/src/include/mesg_state_mgr.hpp index 3bb859e..aebc249 100644 --- a/src/include/mesg_state_mgr.hpp +++ b/src/include/mesg_state_mgr.hpp @@ -1,10 +1,53 @@ +#pragma once + #include +#include +#include +#include +#include + #include +#include + +namespace boost { +template < class T > +class intrusive_ptr; +} // namespace boost + +namespace sisl { +class GenericRpcData; +} namespace nuraft_mesg { class mesg_factory; +class grpc_server; + +using io_blob_list_t = folly::small_vector< sisl::io_blob, 4 >; + +template < typename T > +using Result = folly::Expected< T, std::error_condition >; +template < typename T > +using AsyncResult = folly::SemiFuture< Result< T > >; + +class repl_service_ctx { +public: + repl_service_ctx(grpc_server* server); + virtual ~repl_service_ctx() = default; + + // we do not own this pointer. Use this only if the lyfe cycle of the pointer is well known + grpc_server* m_server; + bool is_raft_leader() const; + + // data service api client call + virtual AsyncResult< sisl::io_blob > data_service_request(std::string const& request_name, + io_blob_list_t const& cli_buf) = 0; + + // Send response to a data service request and finish the async call. + virtual void send_data_service_response(io_blob_list_t const& outgoing_buf, + boost::intrusive_ptr< sisl::GenericRpcData >& rpc_data) = 0; +}; class mesg_state_mgr : public nuraft::state_mgr { public: diff --git a/src/lib/grpc_client.cpp b/src/lib/grpc_client.cpp index 88feb5c..772a6d9 100644 --- a/src/lib/grpc_client.cpp +++ b/src/lib/grpc_client.cpp @@ -13,6 +13,8 @@ * *********************************************************************************/ +#include + // Brief: // grpc_client does the protobuf transformations on nuraft req's // diff --git a/src/lib/messaging.cpp b/src/lib/messaging.cpp index 16df658..69e84b3 100644 --- a/src/lib/messaging.cpp +++ b/src/lib/messaging.cpp @@ -18,7 +18,7 @@ #include "service.hpp" #include "mesg_factory.hpp" -#include "mesg_state_mgr.hpp" +#include "mesg_service.hpp" #include "logger.hpp" #include "utils.hpp" diff --git a/src/lib/service.cpp b/src/lib/service.cpp index 567f3ff..345d788 100644 --- a/src/lib/service.cpp +++ b/src/lib/service.cpp @@ -15,7 +15,7 @@ #include "libnuraft/rpc_listener.hxx" #include "service.hpp" #include "mesg_factory.hpp" -#include "mesg_state_mgr.hpp" +#include "mesg_service.hpp" SISL_LOGGING_DECL(nuraft_mesg) diff --git a/src/lib/utils.hpp b/src/lib/utils.hpp index 7db4d6a..e48677a 100644 --- a/src/lib/utils.hpp +++ b/src/lib/utils.hpp @@ -57,22 +57,4 @@ static std::string get_generic_method_name(std::string const& request_name, std: return fmt::format("{}|{}", request_name, group_id); } -class group_factory : public grpc_factory { - std::shared_ptr< sisl::GrpcTokenClient > m_token_client; - static std::string m_ssl_cert; - -public: - group_factory(int const cli_thread_count, std::string const& name, - std::shared_ptr< sisl::GrpcTokenClient > const token_client, std::string const& ssl_cert = ""); - - using grpc_factory::create_client; - - std::error_condition create_client(const std::string& client, nuraft::ptr< nuraft::rpc_client >&) override; - - std::error_condition reinit_client(std::string const& client, - std::shared_ptr< nuraft::rpc_client >& raft_client) override; - - virtual std::string lookupEndpoint(std::string const& client) = 0; -}; - } // namespace nuraft_mesg diff --git a/src/tests/test_state_manager.h b/src/tests/test_state_manager.h index 36541bc..da25c16 100644 --- a/src/tests/test_state_manager.h +++ b/src/tests/test_state_manager.h @@ -15,7 +15,6 @@ #pragma once #include "mesg_service.hpp" -#include "mesg_state_mgr.hpp" #include class test_state_machine; diff --git a/test_package/example_server.cpp b/test_package/example_server.cpp index 52d9238..a3de404 100644 --- a/test_package/example_server.cpp +++ b/test_package/example_server.cpp @@ -1,7 +1,7 @@ #include #include -#include +#include #include #include #include @@ -45,6 +45,55 @@ void handle(int signal) { } } +class Application : public nuraft_mesg::MessagingApplication, public std::enable_shared_from_this< Application > { +public: + std::string name_; + uint32_t port_; + std::string id_; + std::shared_ptr< nuraft_mesg::Manager > manager_; + + Application(std::string const& name, uint32_t port) : name_(name), port_(port) { id_ = name; } + ~Application() override = default; + + std::string lookup_peer(std::string const& peer) override { + // Provide a method for the service layer to lookup an IPv4:port address + // from a uuid; however the process wants to do that. + for (auto i = 0u; i < 5; ++i) { + if (uuids[i] == peer) { return fmt::format(FMT_STRING("127.0.0.1:{}"), 9000 + i); } + } + RELEASE_ASSERT(false, "Missing Peer: {}", peer); + } + + std::shared_ptr< nuraft_mesg::mesg_state_mgr > create_state_mgr(int32_t const srv_id, + std::string const& group_id) override { + // Each group has a type so we can attach different state_machines upon Join request. + // This callback should provide a mechanism to return a new state_manager. + auto [it, _] = state_mgr_map.emplace( + std::make_pair(group_id + "_" + name_, std::make_shared< simple_state_mgr >(srv_id, id_, group_id))); + return std::static_pointer_cast< nuraft_mesg::mesg_state_mgr >(it->second); + } + + void start() { + auto params = nuraft_mesg::Manager::Params(); + params.server_uuid_ = id_; + params.mesg_port_ = port_; + params.default_group_type_ = "test_package"; + manager_ = init_messaging(params, weak_from_this(), false); + auto r_params = nuraft::raft_params() + .with_election_timeout_lower(elect_to_low) + .with_election_timeout_upper(elect_to_high) + .with_hb_interval(heartbeat_period) + .with_max_append_size(10) + .with_rpc_failure_backoff(rpc_backoff) + .with_auto_forwarding(true) + .with_snapshot_enabled(0); + manager_->register_mgr_type(params.default_group_type_, r_params); + } + +private: + std::map< std::string, std::shared_ptr< simple_state_mgr > > state_mgr_map; +}; + int main(int argc, char** argv) { SISL_OPTIONS_LOAD(argc, argv, logging, server, nuraft_mesg); @@ -64,46 +113,8 @@ int main(int argc, char** argv) { auto const server_port = 9000 + offset_id; LOGINFO("Server starting as: [{}], port: [{}]", server_uuid, server_port); - // Provide a method for the service layer to lookup an IPv4:port address - // from a uuid; however the process wants to do that. - auto messaging_params = - nuraft_mesg::consensus_component::params{server_uuid, server_port, - [](std::string const& client) -> std::string { - for (auto i = 0u; i < 5; ++i) { - if (uuids[i] == client) { - return fmt::format(FMT_STRING("127.0.0.1:{}"), - 9000 + i); - } - } - return client; - }, - "none"}; - - // Intitialize the messaging layer. - auto messaging = nuraft_mesg::service(); - - // RAFT server parameters - nuraft::raft_params r_params; - r_params.with_election_timeout_lower(elect_to_low) - .with_election_timeout_upper(elect_to_high) - .with_hb_interval(heartbeat_period) - .with_max_append_size(10) - .with_rpc_failure_backoff(rpc_backoff) - .with_auto_forwarding(true) - .with_snapshot_enabled(0); - // Each group has a type so we can attach different state_machines upon Join request. - // This callback should provide a mechanism to return a new state_manager. - auto group_type_params = nuraft_mesg::consensus_component::register_params{ - r_params, - [server_uuid](int32_t const srv_id, - std::string const& group_id) -> std::shared_ptr< nuraft_mesg::mesg_state_mgr > { - return std::make_shared< simple_state_mgr >(srv_id, server_uuid, group_id); - }}; - messaging.register_mgr_type("test_package", group_type_params); - - // This will start the RPC service and begin listening for incomming JOIN groups request. - // You can also call create_group and join_group following this operation. - messaging.start(messaging_params); + auto app = std::make_shared< Application >(server_uuid, server_port); + app->start(); { auto lck = std::lock_guard< std::mutex >(k_stop_cv_lock); @@ -112,7 +123,7 @@ int main(int argc, char** argv) { // Create a new group with ourself as the only member if (0 < SISL_OPTIONS.count("create")) { - messaging.create_group(SISL_OPTIONS["create"].as< std::string >(), "test_package"); + app->manager_->create_group(SISL_OPTIONS["create"].as< std::string >(), "test_package"); } // Just prevent main() from exiting, require a SIGNAL diff --git a/test_package/example_state_manager.h b/test_package/example_state_manager.h index 10259e4..4d005cb 100644 --- a/test_package/example_state_manager.h +++ b/test_package/example_state_manager.h @@ -1,6 +1,6 @@ #pragma once -#include +#include #include class simple_state_mgr : public nuraft_mesg::mesg_state_mgr {