Skip to content

Commit

Permalink
Test grpc_factory.
Browse files Browse the repository at this point in the history
  • Loading branch information
szmyd committed Oct 4, 2023
1 parent 2e9b6ad commit a889037
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 44 deletions.
3 changes: 3 additions & 0 deletions src/include/nuraft_mesg/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ using Result = folly::Expected< T, nuraft::cmd_result_code >;
template < typename T >
using AsyncResult = folly::SemiFuture< Result< T > >;

using NullResult = Result< folly::Unit >;
using NullAsyncResult = AsyncResult< folly::Unit >;

} // namespace nuraft_mesg

namespace fmt {
Expand Down
9 changes: 3 additions & 6 deletions src/include/nuraft_mesg/grpc_factory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
// inherited rpc_client instances sharing a common worker pool.
#pragma once

#include <future>
#include <memory>

#include <folly/SharedMutex.h>
Expand Down Expand Up @@ -51,15 +50,13 @@ class grpc_factory : public nuraft::rpc_client_factory, public std::enable_share
virtual std::error_condition reinit_client(peer_id_t const& client, nuraft::ptr< nuraft::rpc_client >&) = 0;

// Construct and send an AddServer message to the cluster
std::future< nuraft::cmd_result_code > add_server(uint32_t const srv_id, std::string const& srv_addr,
nuraft::srv_config const& dest_cfg);
NullAsyncResult add_server(uint32_t const srv_id, peer_id_t const& srv_addr, nuraft::srv_config const& dest_cfg);

// Send a client request to the cluster
std::future< nuraft::cmd_result_code > client_request(std::shared_ptr< nuraft::buffer > buf,
nuraft::srv_config const& dest_cfg);
NullAsyncResult client_request(std::shared_ptr< nuraft::buffer > buf, nuraft::srv_config const& dest_cfg);

// Construct and send a RemoveServer message to the cluster
std::future< nuraft::cmd_result_code > rem_server(uint32_t const srv_id, nuraft::srv_config const& dest_cfg);
NullAsyncResult rem_server(uint32_t const srv_id, nuraft::srv_config const& dest_cfg);
};

} // namespace nuraft_mesg
2 changes: 1 addition & 1 deletion src/include/nuraft_mesg/mesg_factory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class group_factory : public grpc_factory {
static std::string m_ssl_cert;

public:
group_factory(int const cli_thread_count, boost::uuids::uuid const& name,
group_factory(int const cli_thread_count, group_id_t const& name,
std::shared_ptr< sisl::GrpcTokenClient > const token_client, std::string const& ssl_cert = "");

using grpc_factory::create_client;
Expand Down
65 changes: 30 additions & 35 deletions src/lib/grpc_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
// grpc_factory static functions that makes for easy client creation.
//
#include <boost/uuid/string_generator.hpp>
#include <folly/Expected.h>
#include <folly/futures/Future.h>
#include <libnuraft/async.hxx>
#include <sisl/grpc/rpc_client.hpp>

#include "grpc_client.hpp"
Expand All @@ -28,46 +31,54 @@ namespace nuraft_mesg {
template < typename Payload >
struct client_ctx {
int32_t _cur_dest;
std::string const _new_srv_addr;
peer_id_t const _new_srv_addr;

client_ctx(Payload payload, std::shared_ptr< grpc_factory > factory, int32_t dest,
std::string const& new_srv_addr = "") :
peer_id_t const& new_srv_addr = peer_id_t()) :
_cur_dest(dest), _new_srv_addr(new_srv_addr), _payload(payload), _cli_factory(factory) {}

Payload payload() const { return _payload; }
std::shared_ptr< grpc_factory > cli_factory() const { return _cli_factory; }
std::future< nuraft::cmd_result_code > future() { return _promise.get_future(); }
void set(nuraft::cmd_result_code const code) { return _promise.set_value(code); }
NullAsyncResult future() {
auto [p, sf] = folly::makePromiseContract< NullResult >();
_promise = std::move(p);
return sf;
}
void set(nuraft::cmd_result_code const code) {
if (nuraft::OK == code)
_promise.setValue(folly::Unit());
else
_promise.setValue(folly::makeUnexpected(code));
}

private:
Payload const _payload;
std::shared_ptr< grpc_factory > _cli_factory;
std::promise< nuraft::cmd_result_code > _promise;
folly::Promise< NullResult > _promise;
};

template < typename PayloadType >
std::shared_ptr< nuraft::req_msg > createMessage(PayloadType payload, std::string const& srv_addr = "");
std::shared_ptr< nuraft::req_msg > createMessage(PayloadType payload, peer_id_t const& srv_addr = peer_id_t());

template <>
std::shared_ptr< nuraft::req_msg > createMessage(uint32_t const srv_id, std::string const& srv_addr) {
assert(!srv_addr.empty());
auto srv_conf = nuraft::srv_config(srv_id, srv_addr);
std::shared_ptr< nuraft::req_msg > createMessage(uint32_t const srv_id, peer_id_t const& srv_addr) {
auto srv_conf = nuraft::srv_config(srv_id, to_string(srv_addr));
auto log = std::make_shared< nuraft::log_entry >(0, srv_conf.serialize(), nuraft::log_val_type::cluster_server);
auto msg = std::make_shared< nuraft::req_msg >(0, nuraft::msg_type::add_server_request, 0, 0, 0, 0, 0);
msg->log_entries().push_back(log);
return msg;
}

template <>
std::shared_ptr< nuraft::req_msg > createMessage(std::shared_ptr< nuraft::buffer > buf, std::string const&) {
std::shared_ptr< nuraft::req_msg > createMessage(std::shared_ptr< nuraft::buffer > buf, peer_id_t const&) {
auto log = std::make_shared< nuraft::log_entry >(0, buf);
auto msg = std::make_shared< nuraft::req_msg >(0, nuraft::msg_type::client_request, 0, 1, 0, 0, 0);
msg->log_entries().push_back(log);
return msg;
}

template <>
std::shared_ptr< nuraft::req_msg > createMessage(int32_t const srv_id, std::string const&) {
std::shared_ptr< nuraft::req_msg > createMessage(int32_t const srv_id, peer_id_t const&) {
auto buf = nuraft::buffer::alloc(sizeof(srv_id));
buf->put(srv_id);
buf->pos(0);
Expand Down Expand Up @@ -165,15 +176,10 @@ nuraft::ptr< nuraft::rpc_client > grpc_factory::create_client(peer_id_t const& c
return new_client;
}

std::future< nuraft::cmd_result_code > grpc_factory::add_server(uint32_t const srv_id, std::string const& srv_addr,
nuraft::srv_config const& dest_cfg) {
NullAsyncResult grpc_factory::add_server(uint32_t const srv_id, peer_id_t const& srv_addr,
nuraft::srv_config const& dest_cfg) {
auto client = create_client(dest_cfg.get_endpoint());
assert(client);
if (!client) {
std::promise< nuraft::cmd_result_code > p;
p.set_value(nuraft::CANCELLED);
return p.get_future();
}
if (!client) { return folly::makeUnexpected(nuraft::CANCELLED); }

auto ctx = std::make_shared< client_ctx< uint32_t > >(srv_id, shared_from_this(), dest_cfg.get_id(), srv_addr);
auto handler = static_cast< nuraft::rpc_handler >(
Expand All @@ -186,15 +192,9 @@ std::future< nuraft::cmd_result_code > grpc_factory::add_server(uint32_t const s
return ctx->future();
}

std::future< nuraft::cmd_result_code > grpc_factory::rem_server(uint32_t const srv_id,
nuraft::srv_config const& dest_cfg) {
NullAsyncResult grpc_factory::rem_server(uint32_t const srv_id, nuraft::srv_config const& dest_cfg) {
auto client = create_client(dest_cfg.get_endpoint());
assert(client);
if (!client) {
std::promise< nuraft::cmd_result_code > p;
p.set_value(nuraft::CANCELLED);
return p.get_future();
}
if (!client) { return folly::makeUnexpected(nuraft::CANCELLED); }

auto ctx = std::make_shared< client_ctx< int32_t > >(srv_id, shared_from_this(), dest_cfg.get_id());
auto handler = static_cast< nuraft::rpc_handler >(
Expand All @@ -207,15 +207,10 @@ std::future< nuraft::cmd_result_code > grpc_factory::rem_server(uint32_t const s
return ctx->future();
}

std::future< nuraft::cmd_result_code > grpc_factory::client_request(std::shared_ptr< nuraft::buffer > buf,
nuraft::srv_config const& dest_cfg) {
NullAsyncResult grpc_factory::client_request(std::shared_ptr< nuraft::buffer > buf,
nuraft::srv_config const& dest_cfg) {
auto client = create_client(dest_cfg.get_endpoint());
assert(client);
if (!client) {
std::promise< nuraft::cmd_result_code > p;
p.set_value(nuraft::CANCELLED);
return p.get_future();
}
if (!client) { return folly::makeUnexpected(nuraft::CANCELLED); }

auto ctx =
std::make_shared< client_ctx< std::shared_ptr< nuraft::buffer > > >(buf, shared_from_this(), dest_cfg.get_id());
Expand Down
29 changes: 27 additions & 2 deletions src/tests/MessagingTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,23 @@ class TestApplication : public MessagingApplication, public std::enable_shared_f
std::map< nuraft_mesg::peer_id_t, std::string > lookup_map_;
};

struct custom_factory : public nuraft_mesg::group_factory {
custom_factory(int const threads, nuraft_mesg::group_id_t const& name) :
nuraft_mesg::group_factory::group_factory(threads, name, nullptr) {}

std::string lookupEndpoint(nuraft_mesg::peer_id_t const& peer) override {
auto lg = std::scoped_lock(lookup_lock_);
return (lookup_map_.count(peer) > 0) ? lookup_map_[peer] : std::string();
}

void map_peers(std::map< nuraft_mesg::peer_id_t, std::string > const& peers) {
auto lg = std::scoped_lock(lookup_lock_);
lookup_map_ = peers;
}
std::mutex lookup_lock_;
std::map< nuraft_mesg::peer_id_t, std::string > lookup_map_;
};

extern nuraft::ptr< nuraft::cluster_config > fromClusterConfig(nlohmann::json const& cluster_config);

static nuraft::ptr< nuraft::buffer > create_message(nlohmann::json const& j_obj) {
Expand Down Expand Up @@ -126,6 +143,8 @@ class MessagingFixtureBase : public ::testing::Test {

group_id_t group_id_;

std::shared_ptr< custom_factory > custom_factory_;

void get_random_ports(const uint16_t n) {
auto cur_size = ports.size();
for (; ports.size() < cur_size + n;) {
Expand Down Expand Up @@ -161,13 +180,19 @@ class MessagingFixtureBase : public ::testing::Test {
EXPECT_TRUE(!!app_1_->instance_->create_group(group_id_, "test_type").get());
std::this_thread::sleep_for(std::chrono::seconds(1));

// Use app1 to add Server 3
auto add2 = app_1_->instance_->add_member(group_id_, app_2_->id_);
std::this_thread::sleep_for(std::chrono::seconds(1));
EXPECT_TRUE(std::move(add2).get());

auto add3 = app_1_->instance_->add_member(group_id_, app_3_->id_);
custom_factory_ = std::make_shared< custom_factory >(2, group_id_);
custom_factory_->map_peers(lookup_map);

// Use custom factory to add Server 3
auto factory = std::make_shared< mesg_factory >(custom_factory_, group_id_, "test_type");
auto const dest_cfg = nuraft::srv_config(to_server_id(app_1_->id_), to_string(app_1_->id_));
EXPECT_TRUE(!!factory->add_server(to_server_id(app_3_->id_), app_3_->id_, dest_cfg).get());
std::this_thread::sleep_for(std::chrono::seconds(1));
EXPECT_TRUE(std::move(add3).get());
}
};

Expand Down

0 comments on commit a889037

Please sign in to comment.