diff --git a/src/include/nuraft_mesg/common.hpp b/src/include/nuraft_mesg/common.hpp index 73ff360..1f0f4be 100644 --- a/src/include/nuraft_mesg/common.hpp +++ b/src/include/nuraft_mesg/common.hpp @@ -28,6 +28,9 @@ using Result = folly::Expected< T, nuraft::cmd_result_code >; template < typename T > using AsyncResult = folly::SemiFuture< Result< T > >; +using NullResult = Result< folly::Unit >; +using NullAsyncResult = AsyncResult< folly::Unit >; + } // namespace nuraft_mesg namespace fmt { diff --git a/src/include/nuraft_mesg/grpc_factory.hpp b/src/include/nuraft_mesg/grpc_factory.hpp index 4a282e7..6643cf8 100644 --- a/src/include/nuraft_mesg/grpc_factory.hpp +++ b/src/include/nuraft_mesg/grpc_factory.hpp @@ -18,7 +18,6 @@ // inherited rpc_client instances sharing a common worker pool. #pragma once -#include #include #include @@ -51,15 +50,13 @@ class grpc_factory : public nuraft::rpc_client_factory, public std::enable_share virtual std::error_condition reinit_client(peer_id_t const& client, nuraft::ptr< nuraft::rpc_client >&) = 0; // Construct and send an AddServer message to the cluster - std::future< nuraft::cmd_result_code > add_server(uint32_t const srv_id, std::string const& srv_addr, - nuraft::srv_config const& dest_cfg); + NullAsyncResult add_server(uint32_t const srv_id, peer_id_t const& srv_addr, nuraft::srv_config const& dest_cfg); // Send a client request to the cluster - std::future< nuraft::cmd_result_code > client_request(std::shared_ptr< nuraft::buffer > buf, - nuraft::srv_config const& dest_cfg); + NullAsyncResult client_request(std::shared_ptr< nuraft::buffer > buf, nuraft::srv_config const& dest_cfg); // Construct and send a RemoveServer message to the cluster - std::future< nuraft::cmd_result_code > rem_server(uint32_t const srv_id, nuraft::srv_config const& dest_cfg); + NullAsyncResult rem_server(uint32_t const srv_id, nuraft::srv_config const& dest_cfg); }; } // namespace nuraft_mesg diff --git a/src/include/nuraft_mesg/mesg_factory.hpp b/src/include/nuraft_mesg/mesg_factory.hpp index 0b358fc..c5a6f90 100644 --- a/src/include/nuraft_mesg/mesg_factory.hpp +++ b/src/include/nuraft_mesg/mesg_factory.hpp @@ -37,7 +37,7 @@ class group_factory : public grpc_factory { static std::string m_ssl_cert; public: - group_factory(int const cli_thread_count, boost::uuids::uuid const& name, + group_factory(int const cli_thread_count, group_id_t const& name, std::shared_ptr< sisl::GrpcTokenClient > const token_client, std::string const& ssl_cert = ""); using grpc_factory::create_client; diff --git a/src/lib/grpc_factory.cpp b/src/lib/grpc_factory.cpp index a2026ee..fc3e126 100644 --- a/src/lib/grpc_factory.cpp +++ b/src/lib/grpc_factory.cpp @@ -17,6 +17,9 @@ // grpc_factory static functions that makes for easy client creation. // #include +#include +#include +#include #include #include "grpc_client.hpp" @@ -28,30 +31,38 @@ namespace nuraft_mesg { template < typename Payload > struct client_ctx { int32_t _cur_dest; - std::string const _new_srv_addr; + peer_id_t const _new_srv_addr; client_ctx(Payload payload, std::shared_ptr< grpc_factory > factory, int32_t dest, - std::string const& new_srv_addr = "") : + peer_id_t const& new_srv_addr = peer_id_t()) : _cur_dest(dest), _new_srv_addr(new_srv_addr), _payload(payload), _cli_factory(factory) {} Payload payload() const { return _payload; } std::shared_ptr< grpc_factory > cli_factory() const { return _cli_factory; } - std::future< nuraft::cmd_result_code > future() { return _promise.get_future(); } - void set(nuraft::cmd_result_code const code) { return _promise.set_value(code); } + NullAsyncResult future() { + auto [p, sf] = folly::makePromiseContract< NullResult >(); + _promise = std::move(p); + return sf; + } + void set(nuraft::cmd_result_code const code) { + if (nuraft::OK == code) + _promise.setValue(folly::Unit()); + else + _promise.setValue(folly::makeUnexpected(code)); + } private: Payload const _payload; std::shared_ptr< grpc_factory > _cli_factory; - std::promise< nuraft::cmd_result_code > _promise; + folly::Promise< NullResult > _promise; }; template < typename PayloadType > -std::shared_ptr< nuraft::req_msg > createMessage(PayloadType payload, std::string const& srv_addr = ""); +std::shared_ptr< nuraft::req_msg > createMessage(PayloadType payload, peer_id_t const& srv_addr = peer_id_t()); template <> -std::shared_ptr< nuraft::req_msg > createMessage(uint32_t const srv_id, std::string const& srv_addr) { - assert(!srv_addr.empty()); - auto srv_conf = nuraft::srv_config(srv_id, srv_addr); +std::shared_ptr< nuraft::req_msg > createMessage(uint32_t const srv_id, peer_id_t const& srv_addr) { + auto srv_conf = nuraft::srv_config(srv_id, to_string(srv_addr)); auto log = std::make_shared< nuraft::log_entry >(0, srv_conf.serialize(), nuraft::log_val_type::cluster_server); auto msg = std::make_shared< nuraft::req_msg >(0, nuraft::msg_type::add_server_request, 0, 0, 0, 0, 0); msg->log_entries().push_back(log); @@ -59,7 +70,7 @@ std::shared_ptr< nuraft::req_msg > createMessage(uint32_t const srv_id, std::str } template <> -std::shared_ptr< nuraft::req_msg > createMessage(std::shared_ptr< nuraft::buffer > buf, std::string const&) { +std::shared_ptr< nuraft::req_msg > createMessage(std::shared_ptr< nuraft::buffer > buf, peer_id_t const&) { auto log = std::make_shared< nuraft::log_entry >(0, buf); auto msg = std::make_shared< nuraft::req_msg >(0, nuraft::msg_type::client_request, 0, 1, 0, 0, 0); msg->log_entries().push_back(log); @@ -67,7 +78,7 @@ std::shared_ptr< nuraft::req_msg > createMessage(std::shared_ptr< nuraft::buffer } template <> -std::shared_ptr< nuraft::req_msg > createMessage(int32_t const srv_id, std::string const&) { +std::shared_ptr< nuraft::req_msg > createMessage(int32_t const srv_id, peer_id_t const&) { auto buf = nuraft::buffer::alloc(sizeof(srv_id)); buf->put(srv_id); buf->pos(0); @@ -165,15 +176,10 @@ nuraft::ptr< nuraft::rpc_client > grpc_factory::create_client(peer_id_t const& c return new_client; } -std::future< nuraft::cmd_result_code > grpc_factory::add_server(uint32_t const srv_id, std::string const& srv_addr, - nuraft::srv_config const& dest_cfg) { +NullAsyncResult grpc_factory::add_server(uint32_t const srv_id, peer_id_t const& srv_addr, + nuraft::srv_config const& dest_cfg) { auto client = create_client(dest_cfg.get_endpoint()); - assert(client); - if (!client) { - std::promise< nuraft::cmd_result_code > p; - p.set_value(nuraft::CANCELLED); - return p.get_future(); - } + if (!client) { return folly::makeUnexpected(nuraft::CANCELLED); } auto ctx = std::make_shared< client_ctx< uint32_t > >(srv_id, shared_from_this(), dest_cfg.get_id(), srv_addr); auto handler = static_cast< nuraft::rpc_handler >( @@ -186,15 +192,9 @@ std::future< nuraft::cmd_result_code > grpc_factory::add_server(uint32_t const s return ctx->future(); } -std::future< nuraft::cmd_result_code > grpc_factory::rem_server(uint32_t const srv_id, - nuraft::srv_config const& dest_cfg) { +NullAsyncResult grpc_factory::rem_server(uint32_t const srv_id, nuraft::srv_config const& dest_cfg) { auto client = create_client(dest_cfg.get_endpoint()); - assert(client); - if (!client) { - std::promise< nuraft::cmd_result_code > p; - p.set_value(nuraft::CANCELLED); - return p.get_future(); - } + if (!client) { return folly::makeUnexpected(nuraft::CANCELLED); } auto ctx = std::make_shared< client_ctx< int32_t > >(srv_id, shared_from_this(), dest_cfg.get_id()); auto handler = static_cast< nuraft::rpc_handler >( @@ -207,15 +207,10 @@ std::future< nuraft::cmd_result_code > grpc_factory::rem_server(uint32_t const s return ctx->future(); } -std::future< nuraft::cmd_result_code > grpc_factory::client_request(std::shared_ptr< nuraft::buffer > buf, - nuraft::srv_config const& dest_cfg) { +NullAsyncResult grpc_factory::client_request(std::shared_ptr< nuraft::buffer > buf, + nuraft::srv_config const& dest_cfg) { auto client = create_client(dest_cfg.get_endpoint()); - assert(client); - if (!client) { - std::promise< nuraft::cmd_result_code > p; - p.set_value(nuraft::CANCELLED); - return p.get_future(); - } + if (!client) { return folly::makeUnexpected(nuraft::CANCELLED); } auto ctx = std::make_shared< client_ctx< std::shared_ptr< nuraft::buffer > > >(buf, shared_from_this(), dest_cfg.get_id()); diff --git a/src/tests/MessagingTest.cpp b/src/tests/MessagingTest.cpp index fc62ac8..ff1e8d8 100644 --- a/src/tests/MessagingTest.cpp +++ b/src/tests/MessagingTest.cpp @@ -98,6 +98,23 @@ class TestApplication : public MessagingApplication, public std::enable_shared_f std::map< nuraft_mesg::peer_id_t, std::string > lookup_map_; }; +struct custom_factory : public nuraft_mesg::group_factory { + custom_factory(int const threads, nuraft_mesg::group_id_t const& name) : + nuraft_mesg::group_factory::group_factory(threads, name, nullptr) {} + + std::string lookupEndpoint(nuraft_mesg::peer_id_t const& peer) override { + auto lg = std::scoped_lock(lookup_lock_); + return (lookup_map_.count(peer) > 0) ? lookup_map_[peer] : std::string(); + } + + void map_peers(std::map< nuraft_mesg::peer_id_t, std::string > const& peers) { + auto lg = std::scoped_lock(lookup_lock_); + lookup_map_ = peers; + } + std::mutex lookup_lock_; + std::map< nuraft_mesg::peer_id_t, std::string > lookup_map_; +}; + extern nuraft::ptr< nuraft::cluster_config > fromClusterConfig(nlohmann::json const& cluster_config); static nuraft::ptr< nuraft::buffer > create_message(nlohmann::json const& j_obj) { @@ -126,6 +143,8 @@ class MessagingFixtureBase : public ::testing::Test { group_id_t group_id_; + std::shared_ptr< custom_factory > custom_factory_; + void get_random_ports(const uint16_t n) { auto cur_size = ports.size(); for (; ports.size() < cur_size + n;) { @@ -161,13 +180,19 @@ class MessagingFixtureBase : public ::testing::Test { EXPECT_TRUE(!!app_1_->instance_->create_group(group_id_, "test_type").get()); std::this_thread::sleep_for(std::chrono::seconds(1)); + // Use app1 to add Server 3 auto add2 = app_1_->instance_->add_member(group_id_, app_2_->id_); std::this_thread::sleep_for(std::chrono::seconds(1)); EXPECT_TRUE(std::move(add2).get()); - auto add3 = app_1_->instance_->add_member(group_id_, app_3_->id_); + custom_factory_ = std::make_shared< custom_factory >(2, group_id_); + custom_factory_->map_peers(lookup_map); + + // Use custom factory to add Server 3 + auto factory = std::make_shared< mesg_factory >(custom_factory_, group_id_, "test_type"); + auto const dest_cfg = nuraft::srv_config(to_server_id(app_1_->id_), to_string(app_1_->id_)); + EXPECT_TRUE(!!factory->add_server(to_server_id(app_3_->id_), app_3_->id_, dest_cfg).get()); std::this_thread::sleep_for(std::chrono::seconds(1)); - EXPECT_TRUE(std::move(add3).get()); } };