Skip to content

Commit

Permalink
Merge pull request #74 from raakella1/client_blob
Browse files Browse the repository at this point in the history
Offload serialize and deserialize grpc buffers to sisl
  • Loading branch information
raakella1 authored Feb 12, 2024
2 parents 495998f + a3893c5 commit d70fede
Show file tree
Hide file tree
Showing 11 changed files with 36 additions and 58 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

class NuRaftMesgConan(ConanFile):
name = "nuraft_mesg"
version = "2.4.1"
version = "3.0.1"

homepage = "https://github.com/eBay/nuraft_mesg"
description = "A gRPC service for NuRAFT"
Expand Down
4 changes: 4 additions & 0 deletions include/nuraft_mesg/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ SISL_LOGGING_DECL(nuraft_mesg)

#define NURAFTMESG_LOG_MODS nuraft_mesg, grpc_server

namespace sisl {
class GenericClientResponse;
} // namespace sisl

namespace nuraft_mesg {

using peer_id_t = boost::uuids::uuid;
Expand Down
5 changes: 3 additions & 2 deletions include/nuraft_mesg/mesg_factory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,9 @@ class mesg_factory final : public grpc_factory {
NullAsyncResult data_service_request_unidirectional(std::optional< Result< peer_id_t > > const& dest,
std::string const& request_name, io_blob_list_t const& cli_buf);

AsyncResult< sisl::io_blob > data_service_request_bidirectional(std::optional< Result< peer_id_t > > const&,
std::string const&, io_blob_list_t const&);
AsyncResult< sisl::GenericClientResponse >
data_service_request_bidirectional(std::optional< Result< peer_id_t > > const&, std::string const&,
io_blob_list_t const&);
};

} // namespace nuraft_mesg
12 changes: 6 additions & 6 deletions include/nuraft_mesg/mesg_state_mgr.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@
namespace nuraft {
class raft_server;
class state_machine;
}
} // namespace nuraft

namespace sisl {
class GenericRpcData;
}
} // namespace sisl

namespace nuraft_mesg {

Expand Down Expand Up @@ -46,7 +46,7 @@ class repl_service_ctx {
nuraft::raft_server* _server;
bool is_raft_leader() const;
const std::string& raft_leader_id() const;
std::vector< peer_info >get_raft_status() const;
std::vector< peer_info > get_raft_status() const;

// return a list of replica configs for the peers of the raft group
void get_cluster_config(std::list< replica_config >& cluster_config) const;
Expand All @@ -55,9 +55,9 @@ class repl_service_ctx {
virtual NullAsyncResult data_service_request_unidirectional(destination_t const& dest,
std::string const& request_name,
io_blob_list_t const& cli_buf) = 0;
virtual AsyncResult< sisl::io_blob > data_service_request_bidirectional(destination_t const& dest,
std::string const& request_name,
io_blob_list_t const& cli_buf) = 0;
virtual AsyncResult< sisl::GenericClientResponse >
data_service_request_bidirectional(destination_t const& dest, std::string const& request_name,
io_blob_list_t const& cli_buf) = 0;

// Send response to a data service request and finish the async call.
virtual void send_data_service_response(io_blob_list_t const& outgoing_buf,
Expand Down
20 changes: 0 additions & 20 deletions src/lib/common_lib.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,6 @@

namespace nuraft_mesg {

[[maybe_unused]] static void serialize_to_byte_buffer(grpc::ByteBuffer& cli_byte_buf, io_blob_list_t const& cli_buf) {
folly::small_vector< grpc::Slice, 4 > slices;
for (auto const& blob : cli_buf) {
slices.emplace_back(blob.cbytes(), blob.size(), grpc::Slice::STATIC_SLICE);
}
cli_byte_buf.Clear();
grpc::ByteBuffer tmp(slices.data(), cli_buf.size());
cli_byte_buf.Swap(&tmp);
}

[[maybe_unused]] static grpc::Status deserialize_from_byte_buffer(grpc::ByteBuffer const& cli_byte_buf,
sisl::io_blob& cli_buf) {
grpc::Slice slice;
auto status = cli_byte_buf.TrySingleSlice(&slice);
if (!status.ok()) { return status; }
cli_buf.set_bytes(slice.begin());
cli_buf.set_size(slice.size());
return status;
}

// generic rpc server looks up rpc name in a map and calls the corresponding callback. To avoid another lookup in this
// layer, we registed one callback for each (group_id, request_name) pair. The rpc_name is their concatenation.
[[maybe_unused]] static std::string get_generic_method_name(std::string const& request_name,
Expand Down
10 changes: 5 additions & 5 deletions src/lib/repl_service_ctx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <boost/smart_ptr/intrusive_ptr.hpp>
#include <boost/uuid/string_generator.hpp>
#include <sisl/grpc/generic_service.hpp>
#include <sisl/grpc/rpc_client.hpp>
#include <libnuraft/cluster_config.hxx>

#include "nuraft_mesg/mesg_factory.hpp"
Expand Down Expand Up @@ -61,9 +62,9 @@ NullAsyncResult repl_service_ctx_grpc::data_service_request_unidirectional(desti
: m_mesg_factory->data_service_request_unidirectional(get_peer_id(dest), request_name, cli_buf);
}

AsyncResult< sisl::io_blob > repl_service_ctx_grpc::data_service_request_bidirectional(destination_t const& dest,
std::string const& request_name,
io_blob_list_t const& cli_buf) {
AsyncResult< sisl::GenericClientResponse >
repl_service_ctx_grpc::data_service_request_bidirectional(destination_t const& dest, std::string const& request_name,
io_blob_list_t const& cli_buf) {
return (!m_mesg_factory)
? folly::makeUnexpected(nuraft::cmd_result_code::SERVER_NOT_FOUND)
: m_mesg_factory->data_service_request_bidirectional(get_peer_id(dest), request_name, cli_buf);
Expand Down Expand Up @@ -121,8 +122,7 @@ repl_service_ctx_grpc::repl_service_ctx_grpc(grpc_server* server, std::shared_pt

void repl_service_ctx_grpc::send_data_service_response(io_blob_list_t const& outgoing_buf,
boost::intrusive_ptr< sisl::GenericRpcData >& rpc_data) {
serialize_to_byte_buffer(rpc_data->response(), outgoing_buf);
rpc_data->send_response();
rpc_data->send_response(outgoing_buf);
}

} // namespace nuraft_mesg
6 changes: 3 additions & 3 deletions src/lib/repl_service_ctx.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ class repl_service_ctx_grpc : public repl_service_ctx {

NullAsyncResult data_service_request_unidirectional(destination_t const& dest, std::string const& request_name,
io_blob_list_t const& cli_buf) override;
AsyncResult< sisl::io_blob > data_service_request_bidirectional(destination_t const& dest,
std::string const& request_name,
io_blob_list_t const& cli_buf) override;
AsyncResult< sisl::GenericClientResponse >
data_service_request_bidirectional(destination_t const& dest, std::string const& request_name,
io_blob_list_t const& cli_buf) override;
void send_data_service_response(io_blob_list_t const& outgoing_buf,
boost::intrusive_ptr< sisl::GenericRpcData >& rpc_data) override;

Expand Down
28 changes: 10 additions & 18 deletions src/proto/proto_mesg_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,8 @@ class messaging_client : public grpc_client< Messaging >, public std::enable_sha

NullAsyncResult data_service_request_unidirectional(std::string const& request_name,
io_blob_list_t const& cli_buf) {
grpc::ByteBuffer cli_byte_buf;
serialize_to_byte_buffer(cli_byte_buf, cli_buf);
return _generic_stub
->call_unary(cli_byte_buf, request_name,
NURAFT_MESG_CONFIG(mesg_factory_config->data_request_deadline_secs))
->call_unary(cli_buf, request_name, NURAFT_MESG_CONFIG(mesg_factory_config->data_request_deadline_secs))
.deferValue([](auto&& response) -> NullResult {
if (response.hasError()) {
LOGE("Failed to send data_service_request, error: {}", response.error().error_message());
Expand All @@ -93,21 +90,16 @@ class messaging_client : public grpc_client< Messaging >, public std::enable_sha
});
}

AsyncResult< sisl::io_blob > data_service_request_bidirectional(std::string const& request_name,
io_blob_list_t const& cli_buf) {
grpc::ByteBuffer cli_byte_buf;
serialize_to_byte_buffer(cli_byte_buf, cli_buf);
AsyncResult< sisl::GenericClientResponse > data_service_request_bidirectional(std::string const& request_name,
io_blob_list_t const& cli_buf) {
return _generic_stub
->call_unary(cli_byte_buf, request_name,
NURAFT_MESG_CONFIG(mesg_factory_config->data_request_deadline_secs))
.deferValue([](auto&& response) -> Result< sisl::io_blob > {
->call_unary(cli_buf, request_name, NURAFT_MESG_CONFIG(mesg_factory_config->data_request_deadline_secs))
.deferValue([](auto&& response) -> Result< sisl::GenericClientResponse > {
if (response.hasError()) {
LOGE("Failed to send data_service_request, error: {}", response.error().error_message());
return folly::makeUnexpected(nuraft::cmd_result_code::CANCELLED);
}
sisl::io_blob svr_buf;
deserialize_from_byte_buffer(response.value(), svr_buf);
return svr_buf;
return std::move(response.value());
});
}

Expand Down Expand Up @@ -157,8 +149,8 @@ class grpc_proto_client : public grpc_base_client {
return _client->data_service_request_unidirectional(request_name, cli_buf);
}

AsyncResult< sisl::io_blob > data_service_request_bidirectional(std::string const& request_name,
io_blob_list_t const& cli_buf) {
AsyncResult< sisl::GenericClientResponse > data_service_request_bidirectional(std::string const& request_name,
io_blob_list_t const& cli_buf) {
return _client->data_service_request_bidirectional(request_name, cli_buf);
}
};
Expand Down Expand Up @@ -208,10 +200,10 @@ NullAsyncResult mesg_factory::data_service_request_unidirectional(std::optional<
// We ignore the vector of future response from collect all and st the value as folly::unit.
// This is because we do not have a use case to handle the errors that happen during the unidirectional call to all
// the peers.
return folly::collectAll(calls).deferValue([](auto&&) -> NullResult { return folly::unit; });
return folly::collectAll(calls).deferValue([](auto &&) -> NullResult { return folly::unit; });
}

AsyncResult< sisl::io_blob >
AsyncResult< sisl::GenericClientResponse >
mesg_factory::data_service_request_bidirectional(std::optional< Result< peer_id_t > > const& dest,
std::string const& request_name, io_blob_list_t const& cli_buf) {
std::shared_lock< client_factory_lock_type > rl(_client_lock);
Expand Down
2 changes: 1 addition & 1 deletion src/tests/data_service_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ TEST_F(DataServiceFixture, BasicTest1) {

results.push_back(sm4_1->data_service_request_bidirectional(nuraft_mesg::role_regex::LEADER, REQUEST_DATA, cli_buf)
.deferValue([](auto e) -> NullResult {
test_state_mgr::verify_data(e.value());
test_state_mgr::verify_data(e.value().response_blob());
return folly::Unit();
}));

Expand Down
3 changes: 2 additions & 1 deletion src/tests/test_state_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <nlohmann/json.hpp>
#include <libnuraft/state_machine.hxx>
#include <sisl/grpc/generic_service.hpp>
#include <sisl/grpc/rpc_client.hpp>

#include "nuraft_mesg/nuraft_mesg.hpp"
#include "jungle_logstore/jungle_log_store.h"
Expand Down Expand Up @@ -176,7 +177,7 @@ void test_state_mgr::leave() {}

///// data service api helpers

nuraft_mesg::AsyncResult< sisl::io_blob >
nuraft_mesg::AsyncResult< sisl::GenericClientResponse >
test_state_mgr::data_service_request_bidirectional(nuraft_mesg::destination_t const& dest,
std::string const& request_name,
nuraft_mesg::io_blob_list_t const& cli_buf) {
Expand Down
2 changes: 1 addition & 1 deletion src/tests/test_state_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class test_state_mgr : public nuraft_mesg::mesg_state_mgr {
void leave() override;

///// data service helper apis
nuraft_mesg::AsyncResult< sisl::io_blob >
nuraft_mesg::AsyncResult< sisl::GenericClientResponse >
data_service_request_bidirectional(nuraft_mesg::destination_t const& dest, std::string const& request_name,
nuraft_mesg::io_blob_list_t const& cli_buf);
nuraft_mesg::NullAsyncResult data_service_request_unidirectional(nuraft_mesg::destination_t const& dest,
Expand Down

0 comments on commit d70fede

Please sign in to comment.