Skip to content

Commit

Permalink
Improve CodeCoverage to feature/folly_futures (#34)
Browse files Browse the repository at this point in the history
  • Loading branch information
szmyd authored Oct 4, 2023
1 parent 2726e88 commit 4ae2a0e
Show file tree
Hide file tree
Showing 13 changed files with 134 additions and 137 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/conan_build.yml
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
name: NuraftMesg Build
name: NuraftMesg PR Build

on:
workflow_dispatch:
pull_request:
branches:
- main
- feature/*

jobs:
Build:
Expand Down
3 changes: 3 additions & 0 deletions src/include/nuraft_mesg/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ using Result = folly::Expected< T, nuraft::cmd_result_code >;
template < typename T >
using AsyncResult = folly::SemiFuture< Result< T > >;

using NullResult = Result< folly::Unit >;
using NullAsyncResult = AsyncResult< folly::Unit >;

} // namespace nuraft_mesg

namespace fmt {
Expand Down
14 changes: 6 additions & 8 deletions src/include/nuraft_mesg/grpc_factory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// inherited rpc_client instances sharing a common worker pool.
#pragma once

#include <future>
#include <libnuraft/async.hxx>
#include <memory>

#include <folly/SharedMutex.h>
Expand Down Expand Up @@ -46,20 +46,18 @@ class grpc_factory : public nuraft::rpc_client_factory, public std::enable_share
nuraft::ptr< nuraft::rpc_client > create_client(const std::string& client) override;
nuraft::ptr< nuraft::rpc_client > create_client(peer_id_t const& client);

virtual std::error_condition create_client(peer_id_t const& client, nuraft::ptr< nuraft::rpc_client >&) = 0;
virtual nuraft::cmd_result_code create_client(peer_id_t const& client, nuraft::ptr< nuraft::rpc_client >&) = 0;

virtual std::error_condition reinit_client(peer_id_t const& client, nuraft::ptr< nuraft::rpc_client >&) = 0;
virtual nuraft::cmd_result_code reinit_client(peer_id_t const& client, nuraft::ptr< nuraft::rpc_client >&) = 0;

// Construct and send an AddServer message to the cluster
std::future< nuraft::cmd_result_code > add_server(uint32_t const srv_id, std::string const& srv_addr,
nuraft::srv_config const& dest_cfg);
NullAsyncResult add_server(uint32_t const srv_id, peer_id_t const& srv_addr, nuraft::srv_config const& dest_cfg);

// Send a client request to the cluster
std::future< nuraft::cmd_result_code > client_request(std::shared_ptr< nuraft::buffer > buf,
nuraft::srv_config const& dest_cfg);
NullAsyncResult append_entry(std::shared_ptr< nuraft::buffer > buf, nuraft::srv_config const& dest_cfg);

// Construct and send a RemoveServer message to the cluster
std::future< nuraft::cmd_result_code > rem_server(uint32_t const srv_id, nuraft::srv_config const& dest_cfg);
NullAsyncResult rem_server(uint32_t const srv_id, nuraft::srv_config const& dest_cfg);
};

} // namespace nuraft_mesg
16 changes: 8 additions & 8 deletions src/include/nuraft_mesg/mesg_factory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
*********************************************************************************/
#pragma once

#include <future>
#include <libnuraft/async.hxx>
#include <memory>
#include <mutex>
#include <string>
Expand All @@ -37,13 +37,13 @@ class group_factory : public grpc_factory {
static std::string m_ssl_cert;

public:
group_factory(int const cli_thread_count, boost::uuids::uuid const& name,
group_factory(int const cli_thread_count, group_id_t const& name,
std::shared_ptr< sisl::GrpcTokenClient > const token_client, std::string const& ssl_cert = "");

using grpc_factory::create_client;
std::error_condition create_client(peer_id_t const& client, nuraft::ptr< nuraft::rpc_client >&) override;
std::error_condition reinit_client(peer_id_t const& client,
std::shared_ptr< nuraft::rpc_client >& raft_client) override;
nuraft::cmd_result_code create_client(peer_id_t const& client, nuraft::ptr< nuraft::rpc_client >&) override;
nuraft::cmd_result_code reinit_client(peer_id_t const& client,
std::shared_ptr< nuraft::rpc_client >& raft_client) override;

virtual std::string lookupEndpoint(peer_id_t const& client) = 0;
};
Expand All @@ -65,10 +65,10 @@ class mesg_factory final : public grpc_factory {

group_id_t group_name() const { return _group_name; }

std::error_condition create_client(peer_id_t const& client, nuraft::ptr< nuraft::rpc_client >& rpc_ptr) override;
nuraft::cmd_result_code create_client(peer_id_t const& client, nuraft::ptr< nuraft::rpc_client >& rpc_ptr) override;

std::error_condition reinit_client(peer_id_t const& client,
std::shared_ptr< nuraft::rpc_client >& raft_client) override;
nuraft::cmd_result_code reinit_client(peer_id_t const& client,
std::shared_ptr< nuraft::rpc_client >& raft_client) override;

AsyncResult< sisl::io_blob > data_service_request(std::string const& request_name, io_blob_list_t const& cli_buf);
};
Expand Down
3 changes: 1 addition & 2 deletions src/include/nuraft_mesg/nuraft_mesg.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#include <list>
#include <memory>
#include <string>
#include <system_error>
#include <vector>

#include <libnuraft/nuraft.hxx>
Expand Down Expand Up @@ -57,7 +56,7 @@ class Manager {
public:
struct Params {
boost::uuids::uuid server_uuid_;
uint32_t mesg_port_;
uint16_t mesg_port_;
group_type_t default_group_type_;
std::string ssl_key_;
std::string ssl_cert_;
Expand Down
72 changes: 33 additions & 39 deletions src/lib/grpc_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
// grpc_factory static functions that makes for easy client creation.
//
#include <boost/uuid/string_generator.hpp>
#include <folly/Expected.h>
#include <folly/futures/Future.h>
#include <libnuraft/async.hxx>
#include <sisl/grpc/rpc_client.hpp>

#include "grpc_client.hpp"
Expand All @@ -28,46 +31,54 @@ namespace nuraft_mesg {
template < typename Payload >
struct client_ctx {
int32_t _cur_dest;
std::string const _new_srv_addr;
peer_id_t const _new_srv_addr;

client_ctx(Payload payload, std::shared_ptr< grpc_factory > factory, int32_t dest,
std::string const& new_srv_addr = "") :
peer_id_t const& new_srv_addr = peer_id_t()) :
_cur_dest(dest), _new_srv_addr(new_srv_addr), _payload(payload), _cli_factory(factory) {}

Payload payload() const { return _payload; }
std::shared_ptr< grpc_factory > cli_factory() const { return _cli_factory; }
std::future< nuraft::cmd_result_code > future() { return _promise.get_future(); }
void set(nuraft::cmd_result_code const code) { return _promise.set_value(code); }
NullAsyncResult future() {
auto [p, sf] = folly::makePromiseContract< NullResult >();
_promise = std::move(p);
return sf;
}
void set(nuraft::cmd_result_code const code) {
if (nuraft::OK == code)
_promise.setValue(folly::Unit());
else
_promise.setValue(folly::makeUnexpected(code));
}

private:
Payload const _payload;
std::shared_ptr< grpc_factory > _cli_factory;
std::promise< nuraft::cmd_result_code > _promise;
folly::Promise< NullResult > _promise;
};

template < typename PayloadType >
std::shared_ptr< nuraft::req_msg > createMessage(PayloadType payload, std::string const& srv_addr = "");
std::shared_ptr< nuraft::req_msg > createMessage(PayloadType payload, peer_id_t const& srv_addr = peer_id_t());

template <>
std::shared_ptr< nuraft::req_msg > createMessage(uint32_t const srv_id, std::string const& srv_addr) {
assert(!srv_addr.empty());
auto srv_conf = nuraft::srv_config(srv_id, srv_addr);
std::shared_ptr< nuraft::req_msg > createMessage(uint32_t const srv_id, peer_id_t const& srv_addr) {
auto srv_conf = nuraft::srv_config(srv_id, to_string(srv_addr));
auto log = std::make_shared< nuraft::log_entry >(0, srv_conf.serialize(), nuraft::log_val_type::cluster_server);
auto msg = std::make_shared< nuraft::req_msg >(0, nuraft::msg_type::add_server_request, 0, 0, 0, 0, 0);
msg->log_entries().push_back(log);
return msg;
}

template <>
std::shared_ptr< nuraft::req_msg > createMessage(std::shared_ptr< nuraft::buffer > buf, std::string const&) {
std::shared_ptr< nuraft::req_msg > createMessage(std::shared_ptr< nuraft::buffer > buf, peer_id_t const&) {
auto log = std::make_shared< nuraft::log_entry >(0, buf);
auto msg = std::make_shared< nuraft::req_msg >(0, nuraft::msg_type::client_request, 0, 1, 0, 0, 0);
msg->log_entries().push_back(log);
return msg;
}

template <>
std::shared_ptr< nuraft::req_msg > createMessage(int32_t const srv_id, std::string const&) {
std::shared_ptr< nuraft::req_msg > createMessage(int32_t const srv_id, peer_id_t const&) {
auto buf = nuraft::buffer::alloc(sizeof(srv_id));
buf->put(srv_id);
buf->pos(0);
Expand Down Expand Up @@ -145,16 +156,16 @@ nuraft::ptr< nuraft::rpc_client > grpc_factory::create_client(peer_id_t const& c
if (_clients.end() != it) {
if (!happened) {
LOGDEBUGMOD(nuraft_mesg, "Re-creating client for {}", client);
if (auto err = reinit_client(client, it->second); err) {
LOGERROR("Failed to re-initialize client {}: {}", client, err.message());
if (auto err = reinit_client(client, it->second); nuraft::OK != err) {
LOGERROR("Failed to re-initialize client {}: {}", client, err);
new_client = std::make_shared< grpc_error_client >();
} else {
new_client = it->second;
}
} else {
LOGDEBUGMOD(nuraft_mesg, "Creating client for {}", client);
if (auto err = create_client(client, it->second); err) {
LOGERROR("Failed to create client for {}: {}", client, err.message());
if (auto err = create_client(client, it->second); nuraft::OK != err) {
LOGERROR("Failed to create client for {}: {}", client, err);
new_client = std::make_shared< grpc_error_client >();
} else {
new_client = it->second;
Expand All @@ -165,15 +176,10 @@ nuraft::ptr< nuraft::rpc_client > grpc_factory::create_client(peer_id_t const& c
return new_client;
}

std::future< nuraft::cmd_result_code > grpc_factory::add_server(uint32_t const srv_id, std::string const& srv_addr,
nuraft::srv_config const& dest_cfg) {
NullAsyncResult grpc_factory::add_server(uint32_t const srv_id, peer_id_t const& srv_addr,
nuraft::srv_config const& dest_cfg) {
auto client = create_client(dest_cfg.get_endpoint());
assert(client);
if (!client) {
std::promise< nuraft::cmd_result_code > p;
p.set_value(nuraft::CANCELLED);
return p.get_future();
}
if (!client) { return folly::makeUnexpected(nuraft::CANCELLED); }

auto ctx = std::make_shared< client_ctx< uint32_t > >(srv_id, shared_from_this(), dest_cfg.get_id(), srv_addr);
auto handler = static_cast< nuraft::rpc_handler >(
Expand All @@ -186,15 +192,9 @@ std::future< nuraft::cmd_result_code > grpc_factory::add_server(uint32_t const s
return ctx->future();
}

std::future< nuraft::cmd_result_code > grpc_factory::rem_server(uint32_t const srv_id,
nuraft::srv_config const& dest_cfg) {
NullAsyncResult grpc_factory::rem_server(uint32_t const srv_id, nuraft::srv_config const& dest_cfg) {
auto client = create_client(dest_cfg.get_endpoint());
assert(client);
if (!client) {
std::promise< nuraft::cmd_result_code > p;
p.set_value(nuraft::CANCELLED);
return p.get_future();
}
if (!client) { return folly::makeUnexpected(nuraft::CANCELLED); }

auto ctx = std::make_shared< client_ctx< int32_t > >(srv_id, shared_from_this(), dest_cfg.get_id());
auto handler = static_cast< nuraft::rpc_handler >(
Expand All @@ -207,15 +207,9 @@ std::future< nuraft::cmd_result_code > grpc_factory::rem_server(uint32_t const s
return ctx->future();
}

std::future< nuraft::cmd_result_code > grpc_factory::client_request(std::shared_ptr< nuraft::buffer > buf,
nuraft::srv_config const& dest_cfg) {
NullAsyncResult grpc_factory::append_entry(std::shared_ptr< nuraft::buffer > buf, nuraft::srv_config const& dest_cfg) {
auto client = create_client(dest_cfg.get_endpoint());
assert(client);
if (!client) {
std::promise< nuraft::cmd_result_code > p;
p.set_value(nuraft::CANCELLED);
return p.get_future();
}
if (!client) { return folly::makeUnexpected(nuraft::CANCELLED); }

auto ctx =
std::make_shared< client_ctx< std::shared_ptr< nuraft::buffer > > >(buf, shared_from_this(), dest_cfg.get_id());
Expand Down
32 changes: 0 additions & 32 deletions src/lib/manager_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#include <sisl/options/options.h>
#include <sisl/grpc/rpc_server.hpp>
#include <sisl/grpc/generic_service.hpp>
#include <system_error>

#include "service.hpp"
#include "nuraft_mesg/mesg_factory.hpp"
Expand All @@ -28,37 +27,6 @@ constexpr auto grpc_server_threads = 1u;

namespace nuraft_mesg {

static std::error_condition convertToError(nuraft::cmd_result_code rc) {
switch (rc) {
case nuraft::OK:
return std::error_condition();
case nuraft::CANCELLED:
return std::make_error_condition(std::errc::operation_canceled);
case nuraft::TIMEOUT:
return std::make_error_condition(std::errc::timed_out);
case nuraft::NOT_LEADER:
return std::make_error_condition(std::errc::permission_denied);
case nuraft::BAD_REQUEST:
return std::make_error_condition(std::errc::invalid_argument);
case nuraft::SERVER_ALREADY_EXISTS:
return std::make_error_condition(std::errc::file_exists);
case nuraft::CONFIG_CHANGING:
return std::make_error_condition(std::errc::interrupted);
case nuraft::SERVER_IS_JOINING:
return std::make_error_condition(std::errc::device_or_resource_busy);
case nuraft::SERVER_NOT_FOUND:
return std::make_error_condition(std::errc::no_such_device);
case nuraft::CANNOT_REMOVE_LEADER:
return std::make_error_condition(std::errc::not_supported);
case nuraft::SERVER_IS_LEAVING:
return std::make_error_condition(std::errc::owner_dead);
case nuraft::FAILED:
[[fallthrough]];
default:
return std::make_error_condition(std::errc::io_error);
}
}

int32_t to_server_id(peer_id_t const& server_addr) {
boost::hash< boost::uuids::uuid > uuid_hasher;
return uuid_hasher(server_addr) >> 33;
Expand Down
2 changes: 0 additions & 2 deletions src/lib/manager_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@
#pragma once

#include <condition_variable>
#include <future>
#include <libnuraft/async.hxx>
#include <list>
#include <map>
#include <mutex>
#include <string>
#include <system_error>

#include <sisl/logging/logging.h>

Expand Down
28 changes: 14 additions & 14 deletions src/lib/mesg_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,24 +134,24 @@ class group_client : public grpc_base_client {
}
};

std::error_condition mesg_factory::create_client(peer_id_t const& client,
nuraft::ptr< nuraft::rpc_client >& raft_client) {
nuraft::cmd_result_code mesg_factory::create_client(peer_id_t const& client,
nuraft::ptr< nuraft::rpc_client >& raft_client) {
// Re-direct this call to a global factory so we can re-use clients to the same endpoints
LOGDEBUGMOD(nuraft_mesg, "Creating client to {}", client);
auto m_client = std::dynamic_pointer_cast< messaging_client >(_group_factory->create_client(to_string(client)));
if (!m_client) { return std::make_error_condition(std::errc::connection_aborted); }
if (!m_client) return nuraft::CANCELLED;
raft_client = std::make_shared< group_client >(m_client, client, _group_name, _group_type, _metrics);
return (!raft_client) ? std::make_error_condition(std::errc::invalid_argument) : std::error_condition();
return (!raft_client) ? nuraft::BAD_REQUEST : nuraft::OK;
}

std::error_condition mesg_factory::reinit_client(peer_id_t const& client,
std::shared_ptr< nuraft::rpc_client >& raft_client) {
nuraft::cmd_result_code mesg_factory::reinit_client(peer_id_t const& client,
std::shared_ptr< nuraft::rpc_client >& raft_client) {
LOGDEBUGMOD(nuraft_mesg, "Re-init client to {}", client);
auto g_client = std::dynamic_pointer_cast< group_client >(raft_client);
auto new_raft_client = std::static_pointer_cast< nuraft::rpc_client >(g_client->realClient());
if (auto err = _group_factory->reinit_client(client, new_raft_client); err) { return err; }
g_client->setClient(std::dynamic_pointer_cast< messaging_client >(new_raft_client));
return std::error_condition();
return nuraft::OK;
}

AsyncResult< sisl::io_blob > mesg_factory::data_service_request(std::string const& request_name,
Expand All @@ -171,27 +171,27 @@ group_factory::group_factory(int const cli_thread_count, group_id_t const& name,
m_ssl_cert = ssl_cert;
}

std::error_condition group_factory::create_client(peer_id_t const& client,
nuraft::ptr< nuraft::rpc_client >& raft_client) {
nuraft::cmd_result_code group_factory::create_client(peer_id_t const& client,
nuraft::ptr< nuraft::rpc_client >& raft_client) {
LOGDEBUGMOD(nuraft_mesg, "Creating client to {}", client);
auto endpoint = lookupEndpoint(client);
if (endpoint.empty()) { return std::make_error_condition(std::errc::invalid_argument); }
if (endpoint.empty()) nuraft::BAD_REQUEST;

LOGDEBUGMOD(nuraft_mesg, "Creating client for [{}] @ [{}]", client, endpoint);
raft_client =
sisl::GrpcAsyncClient::make< messaging_client >(workerName(), endpoint, m_token_client, "", m_ssl_cert);
return (!raft_client) ? std::make_error_condition(std::errc::connection_aborted) : std::error_condition();
return (!raft_client) ? nuraft::CANCELLED : nuraft::OK;
}

std::error_condition group_factory::reinit_client(peer_id_t const& client,
std::shared_ptr< nuraft::rpc_client >& raft_client) {
nuraft::cmd_result_code group_factory::reinit_client(peer_id_t const& client,
std::shared_ptr< nuraft::rpc_client >& raft_client) {
LOGDEBUGMOD(nuraft_mesg, "Re-init client to {}", client);
assert(raft_client);
auto mesg_client = std::dynamic_pointer_cast< messaging_client >(raft_client);
if (!mesg_client->is_connection_ready() || 0 < mesg_client->bad_service.load(std::memory_order_relaxed)) {
return create_client(client, raft_client);
}
return std::error_condition();
return nuraft::OK;
}

} // namespace nuraft_mesg
Loading

0 comments on commit 4ae2a0e

Please sign in to comment.