Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generic client workers #77

Merged
merged 3 commits into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

class NuRaftMesgConan(ConanFile):
name = "nuraft_mesg"
version = "3.1.0"
version = "3.2.1"

homepage = "https://github.com/eBay/nuraft_mesg"
description = "A gRPC service for NuRAFT"
Expand Down
8 changes: 8 additions & 0 deletions include/nuraft_mesg/mesg_factory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,21 @@ using client_factory_lock_type = folly::SharedMutex;
// inherited rpc_client instances sharing a common worker pool.
class grpc_factory : public nuraft::rpc_client_factory, public std::enable_shared_from_this< grpc_factory > {
std::string _worker_name;
std::string _raft_worker_name;
std::string _data_worker_name;

protected:
client_factory_lock_type _client_lock;
std::map< peer_id_t, std::shared_ptr< nuraft::rpc_client > > _clients;

public:
grpc_factory(int const cli_thread_count, std::string const& name);
grpc_factory(int const raft_cli_thread_count, int const data_cli_thread_count, std::string const& name);
~grpc_factory() override = default;

std::string const& workerName() const { return _worker_name; }
std::string const& raftWorkerName() const { return _raft_worker_name; }
std::string const& dataWorkerName() const { return _data_worker_name; }
raakella1 marked this conversation as resolved.
Show resolved Hide resolved

nuraft::ptr< nuraft::rpc_client > create_client(const std::string& client) override;
nuraft::ptr< nuraft::rpc_client > create_client(peer_id_t const& client);
Expand All @@ -74,6 +79,9 @@ class group_factory : public grpc_factory {
group_factory(int const cli_thread_count, group_id_t const& name,
std::shared_ptr< sisl::GrpcTokenClient > const token_client, std::string const& ssl_cert = "");

group_factory(int const raft_cli_thread_count, int const data_cli_thread_count, group_id_t const& name,
std::shared_ptr< sisl::GrpcTokenClient > const token_client, std::string const& ssl_cert = "");

using grpc_factory::create_client;
nuraft::cmd_result_code create_client(peer_id_t const& client, nuraft::ptr< nuraft::rpc_client >&) override;
nuraft::cmd_result_code reinit_client(peer_id_t const& client,
Expand Down
4 changes: 2 additions & 2 deletions src/lib/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class grpc_client : public grpc_base_client, public sisl::GrpcAsyncClient {
grpc_base_client(),
sisl::GrpcAsyncClient(addr, token_client, target_domain, ssl_cert),
_addr(addr),
_worker_name(worker_name.data()) {
_worker_name(worker_name) {
init();
}

Expand All @@ -81,7 +81,7 @@ class grpc_client : public grpc_base_client, public sisl::GrpcAsyncClient {

protected:
std::string const _addr;
char const* _worker_name;
std::string _worker_name;
typename ::sisl::GrpcAsyncClient::AsyncStub< TSERVICE >::UPtr _stub;
};

Expand Down
14 changes: 13 additions & 1 deletion src/lib/factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,19 @@ void respHandler(std::shared_ptr< ContextType > ctx, std::shared_ptr< nuraft::re

grpc_factory::grpc_factory(int const cli_thread_count, std::string const& name) :
rpc_client_factory(), _worker_name(name) {
if (0 < cli_thread_count) { sisl::GrpcAsyncClientWorker::create_worker(_worker_name.data(), cli_thread_count); }
if (0 < cli_thread_count) { sisl::GrpcAsyncClientWorker::create_worker(_worker_name, cli_thread_count); }
}

grpc_factory::grpc_factory(int const raft_cli_thread_count, int const data_cli_thread_count, std::string const& name) :
rpc_client_factory(),
_raft_worker_name(fmt::format("raft_{}", name)),
_data_worker_name(fmt::format("data_{}", name)) {
if (0 < raft_cli_thread_count) {
sisl::GrpcAsyncClientWorker::create_worker(_raft_worker_name, raft_cli_thread_count);
}
if (0 < data_cli_thread_count) {
sisl::GrpcAsyncClientWorker::create_worker(_data_worker_name, data_cli_thread_count);
}
}

class grpc_error_client : public grpc_base_client {
Expand Down
32 changes: 20 additions & 12 deletions src/lib/manager_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@
#include "repl_service_ctx.hpp"
#include "service.hpp"
#include "logger.hpp"

constexpr auto leader_change_timeout = std::chrono::milliseconds(3200);
constexpr auto grpc_client_threads = 1u;
constexpr auto grpc_server_threads = 1u;
#include "nuraft_mesg_config.hpp"

SISL_LOGGING_DEF(nuraft_mesg)

Expand All @@ -44,6 +41,12 @@ class engine_factory : public group_factory {
start_params.ssl_cert_),
application_(app) {}

raakella1 marked this conversation as resolved.
Show resolved Hide resolved
engine_factory(int const raft_threads, int const data_threads, Manager::Params const& start_params,
std::weak_ptr< MessagingApplication > app) :
group_factory::group_factory(raft_threads, data_threads, start_params.server_uuid_,
start_params.token_client_, start_params.ssl_cert_),
application_(app) {}

std::string lookupEndpoint(peer_id_t const& client) override {
LOGT("[peer={}]", client);
if (auto a = application_.lock(); a) return a->lookup_peer(client);
Expand All @@ -60,7 +63,9 @@ ManagerImpl::~ManagerImpl() {

ManagerImpl::ManagerImpl(Manager::Params const& start_params, std::weak_ptr< MessagingApplication > app) :
start_params_(start_params), _srv_id(to_server_id(start_params_.server_uuid_)), application_(app) {
_g_factory = std::make_shared< engine_factory >(grpc_client_threads, start_params_, app);
_g_factory =
std::make_shared< engine_factory >(NURAFT_MESG_CONFIG(grpc_raft_client_thread_cnt),
NURAFT_MESG_CONFIG(grpc_data_client_thread_cnt), start_params_, app);
auto logger_name = fmt::format("nuraft_{}", start_params_.server_uuid_);
//
// NOTE: The Unit tests require this instance to be recreated with the same parameters.
Expand All @@ -75,7 +80,7 @@ ManagerImpl::ManagerImpl(Manager::Params const& start_params, std::weak_ptr< Mes

// RAFT request scheduler
nuraft::asio_service::options service_options;
service_options.thread_pool_size_ = 1;
service_options.thread_pool_size_ = NURAFT_MESG_CONFIG(raft_scheduler_thread_cnt);
_scheduler = std::make_shared< nuraft::asio_service >(service_options, logger);
}

Expand All @@ -94,9 +99,9 @@ void ManagerImpl::restart_server() {
std::lock_guard< std::mutex > lg(_manager_lock);
RELEASE_ASSERT(_mesg_service, "Need to call ::start() first!");
_grpc_server.reset();
_grpc_server = std::unique_ptr< sisl::GrpcServer >(
sisl::GrpcServer::make(listen_address, start_params_.token_verifier_, grpc_server_threads,
start_params_.ssl_key_, start_params_.ssl_cert_));
_grpc_server = std::unique_ptr< sisl::GrpcServer >(sisl::GrpcServer::make(
listen_address, start_params_.token_verifier_, NURAFT_MESG_CONFIG(grpc_server_thread_cnt),
start_params_.ssl_key_, start_params_.ssl_cert_));
_mesg_service->associate(_grpc_server.get());

_grpc_server->run();
Expand Down Expand Up @@ -223,7 +228,8 @@ NullAsyncResult ManagerImpl::add_member(group_id_t const& group_id, peer_id_t co
// TODO This should not block, but attach a new promise!
auto lk = std::unique_lock< std::mutex >(_manager_lock);
if (!_config_change.wait_for(
lk, leader_change_timeout, [this, g_id = std::move(g_id), n_id = std::move(n_id)]() {
lk, std::chrono::milliseconds(NURAFT_MESG_CONFIG(raft_leader_change_timeout_ms)),
[this, g_id = std::move(g_id), n_id = std::move(n_id)]() {
std::vector< std::shared_ptr< nuraft::srv_config > > srv_list;
_mesg_service->get_srv_config_all(g_id, srv_list);
return std::find_if(srv_list.begin(), srv_list.end(),
Expand Down Expand Up @@ -252,7 +258,8 @@ NullAsyncResult ManagerImpl::become_leader(group_id_t const& group_id) {
if (!_mesg_service->become_leader(g_id)) return folly::makeUnexpected(nuraft::cmd_result_code::CANCELLED);

auto lk = std::unique_lock< std::mutex >(_manager_lock);
if (!_config_change.wait_for(lk, leader_change_timeout,
if (!_config_change.wait_for(lk,
std::chrono::milliseconds(NURAFT_MESG_CONFIG(raft_leader_change_timeout_ms)),
[this, g_id = std::move(g_id)]() { return _is_leader[g_id]; }))
return folly::makeUnexpected(nuraft::cmd_result_code::TIMEOUT);
return folly::Unit();
Expand Down Expand Up @@ -283,7 +290,8 @@ NullAsyncResult ManagerImpl::create_group(group_id_t const& group_id, std::strin
return folly::makeSemiFuture< folly::Unit >(folly::Unit())
.deferValue([this, g_id = group_id](auto) mutable -> NullResult {
auto lk = std::unique_lock< std::mutex >(_manager_lock);
if (!_config_change.wait_for(lk, leader_change_timeout,
if (!_config_change.wait_for(lk,
std::chrono::milliseconds(NURAFT_MESG_CONFIG(raft_leader_change_timeout_ms)),
[this, g_id = std::move(g_id)]() { return _is_leader[g_id]; })) {
return folly::makeUnexpected(nuraft::cmd_result_code::CANCELLED);
}
Expand Down
10 changes: 10 additions & 0 deletions src/lib/nuraft_mesg_config.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,16 @@ table MesgFactoryConfig {

table NuraftMesgConfig {
mesg_factory_config: MesgFactoryConfig;

grpc_raft_client_thread_cnt: uint16 = 1;

grpc_data_client_thread_cnt: uint16 = 1;

grpc_server_thread_cnt: uint16 = 2;
raakella1 marked this conversation as resolved.
Show resolved Hide resolved

raft_leader_change_timeout_ms: uint32 = 3200;

raft_scheduler_thread_cnt: uint16 = 2;
}

root_type NuraftMesgConfig;
11 changes: 11 additions & 0 deletions src/lib/nuraft_mesg_config.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#pragma once
#include <sisl/settings/settings.hpp>
#include "lib/generated/nuraft_mesg_config_generated.h"

SETTINGS_INIT(nuraftmesgcfg::NuraftMesgConfig, nuraft_mesg_config);

#define NURAFT_MESG_CONFIG_WITH(...) SETTINGS(nuraft_mesg_config, __VA_ARGS__)
#define NURAFT_MESG_CONFIG_THIS(...) SETTINGS_THIS(nuraft_mesg_config, __VA_ARGS__)
#define NURAFT_MESG_CONFIG(...) SETTINGS_VALUE(nuraft_mesg_config, __VA_ARGS__)

#define NURAFT_MESG_SETTINGS_FACTORY() SETTINGS_FACTORY(nuraft_mesg_config)
28 changes: 16 additions & 12 deletions src/proto/proto_mesg_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,15 @@
#include <string>

#include <folly/futures/Future.h>
#include <sisl/settings/settings.hpp>

#include "nuraft_mesg/mesg_factory.hpp"
#include "lib/client.hpp"
#include "lib/service.hpp"
#include "lib/generated/nuraft_mesg_config_generated.h"
#include "lib/nuraft_mesg_config.hpp"

#include "messaging_service.grpc.pb.h"
#include "utils.hpp"

SETTINGS_INIT(nuraftmesgcfg::NuraftMesgConfig, nuraft_mesg_config);

#define NURAFT_MESG_CONFIG_WITH(...) SETTINGS(nuraft_mesg_config, __VA_ARGS__)
#define NURAFT_MESG_CONFIG_THIS(...) SETTINGS_THIS(nuraft_mesg_config, __VA_ARGS__)
#define NURAFT_MESG_CONFIG(...) SETTINGS_VALUE(nuraft_mesg_config, __VA_ARGS__)

#define NURAFT_MESG_SETTINGS_FACTORY() SETTINGS_FACTORY(nuraft_mesg_config)

namespace nuraft_mesg {

std::string group_factory::m_ssl_cert;
Expand All @@ -48,6 +39,13 @@ class messaging_client : public grpc_client< Messaging >, public std::enable_sha
ssl_cert) {
_generic_stub = sisl::GrpcAsyncClient::make_generic_stub(_worker_name);
}
messaging_client(std::string const& raft_worker_name, std::string const& data_worker_name, std::string const& addr,
const std::shared_ptr< sisl::GrpcTokenClient > token_client, std::string const& target_domain = "",
std::string const& ssl_cert = "") :
nuraft_mesg::grpc_client< Messaging >::grpc_client(raft_worker_name, addr, token_client, target_domain,
ssl_cert) {
_generic_stub = sisl::GrpcAsyncClient::make_generic_stub(data_worker_name);
}
~messaging_client() override = default;

using grpc_base_client::send;
Expand Down Expand Up @@ -230,15 +228,21 @@ group_factory::group_factory(int const cli_thread_count, group_id_t const& name,
m_ssl_cert = ssl_cert;
}

group_factory::group_factory(int const raft_cli_thread_count, int const data_cli_thread_count, group_id_t const& name,
std::shared_ptr< sisl::GrpcTokenClient > const token_client, std::string const& ssl_cert) :
grpc_factory(raft_cli_thread_count, data_cli_thread_count, to_string(name)), m_token_client(token_client) {
m_ssl_cert = ssl_cert;
}

nuraft::cmd_result_code group_factory::create_client(peer_id_t const& client,
nuraft::ptr< nuraft::rpc_client >& raft_client) {
LOGD("Creating client to {}", client);
auto endpoint = lookupEndpoint(client);
if (endpoint.empty()) return nuraft::BAD_REQUEST;

LOGD("Creating client for [{}] @ [{}]", client, endpoint);
raft_client =
sisl::GrpcAsyncClient::make< messaging_client >(workerName(), endpoint, m_token_client, "", m_ssl_cert);
raft_client = sisl::GrpcAsyncClient::make< messaging_client >(raftWorkerName(), dataWorkerName(), endpoint,
m_token_client, "", m_ssl_cert);
return (!raft_client) ? nuraft::CANCELLED : nuraft::OK;
}

Expand Down
2 changes: 1 addition & 1 deletion src/proto/proto_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ static RCResponse* fromRCResponse(nuraft::resp_msg& rcmsg) {
req->set_accepted(rcmsg.get_accepted());
req->set_result_code((ResultCode)(0 - rcmsg.get_result_code()));
auto ctx = rcmsg.get_ctx();
if (ctx) { req->set_context(ctx->data(), ctx->container_size()); }
if (ctx) { req->set_context(ctx->data(), ctx->size()); }
return req;
}

Expand Down
6 changes: 3 additions & 3 deletions src/tests/test_fixture.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ private:
};

struct custom_factory : public nuraft_mesg::group_factory {
custom_factory(int const threads, nuraft_mesg::group_id_t const& name) :
nuraft_mesg::group_factory::group_factory(threads, name, nullptr) {}
custom_factory(int const raft_threads, int const data_threads, nuraft_mesg::group_id_t const& name) :
nuraft_mesg::group_factory::group_factory(raft_threads, data_threads, name, nullptr) {}

std::string lookupEndpoint(nuraft_mesg::peer_id_t const& peer) override {
auto lg = std::scoped_lock(lookup_lock_);
Expand Down Expand Up @@ -190,7 +190,7 @@ protected:
std::this_thread::sleep_for(std::chrono::seconds(1));
EXPECT_TRUE(std::move(add2).get());

custom_factory_ = std::make_shared< custom_factory >(2, group_id_);
custom_factory_ = std::make_shared< custom_factory >(2, 2, group_id_);
custom_factory_->map_peers(lookup_map);

// Use custom factory to add Server 3
Expand Down
Loading