Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prepare for FlatBuffers support #45

Merged
merged 4 commits into from
Nov 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

class NuRaftMesgConan(ConanFile):
name = "nuraft_mesg"
version = "2.0.2"
version = "2.0.3"

homepage = "https://github.com/eBay/nuraft_mesg"
description = "A gRPC service for NuRAFT"
Expand Down Expand Up @@ -52,7 +52,7 @@ def configure(self):
raise ConanInvalidConfiguration("Coverage/Sanitizer requires Testing!")

def build_requirements(self):
self.build_requires("gtest/1.13.0")
self.build_requires("gtest/1.14.0")
if (self.options.testing):
self.build_requires("jungle/cci.20221201")

Expand All @@ -61,6 +61,7 @@ def requirements(self):
self.requires("nuraft/2.3.0")

self.requires("boost/1.82.0")
self.requires("flatbuffers/23.5.26")
self.requires("openssl/3.1.1")
self.requires("lz4/1.9.4")

Expand Down Expand Up @@ -102,9 +103,13 @@ def package(self):
copy(self, "*.pb.h", join(self.build_folder, "src"), gen_dir, keep_path=False)

def package_info(self):
self.cpp_info.libs = ["nuraft_mesg"]
self.cpp_info.names["cmake_find_package"] = "NuraftMesg"
self.cpp_info.names["cmake_find_package_multi"] = "NuraftMesg"
self.cpp_info.components["proto"].libs = ["nuraft_mesg", "nuraft_mesg_proto"]
self.cpp_info.components["proto"].requires = ["nuraft::nuraft", "sisl::sisl"]

if self.settings.build_type == "Debug" and self.options.sanitize:
self.cpp_info.sharedlinkflags.append("-fsanitize=address")
self.cpp_info.exelinkflags.append("-fsanitize=address")
self.cpp_info.sharedlinkflags.append("-fsanitize=undefined")
self.cpp_info.exelinkflags.append("-fsanitize=undefined")
self.cpp_info.components["proto"].sharedlinkflags.append("-fsanitize=address")
self.cpp_info.components["proto"].exelinkflags.append("-fsanitize=address")
self.cpp_info.components["proto"].sharedlinkflags.append("-fsanitize=undefined")
self.cpp_info.components["proto"].exelinkflags.append("-fsanitize=undefined")
23 changes: 2 additions & 21 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,9 @@ find_package(sisl REQUIRED)
find_package(nuraft REQUIRED)

include(${sisl_INCLUDE_DIRS}/../cmake/settings_gen.cmake)

add_subdirectory (proto)
include_directories(BEFORE "include" ${CMAKE_CURRENT_BINARY_DIR} ${CMAKE_CURRENT_SOURCE_DIR})

add_flags("-Wall -Wextra -Werror -Wpedantic")

add_library(${PROJECT_NAME})
target_sources(${PROJECT_NAME} PRIVATE
lib/grpc_client.cpp
lib/grpc_factory.cpp
lib/grpc_server.cpp
lib/mesg_client.cpp
lib/service.cpp
lib/data_service_grpc.cpp
lib/manager_impl.cpp
$<TARGET_OBJECTS:${PROJECT_NAME}-proto>
)
settings_gen_cpp($<TARGET_FILE:flatbuffers::flatc> ${CMAKE_CURRENT_BINARY_DIR}/generated/ ${PROJECT_NAME} lib/nuraft_mesg_config.fbs)
target_link_libraries(${PROJECT_NAME}
sisl::sisl
nuraft::nuraft
)
target_include_directories(${PROJECT_NAME} PUBLIC "include")
target_include_directories(${PROJECT_NAME} PRIVATE "${CMAKE_CURRENT_BINARY_DIR}")

add_subdirectory (lib)
add_subdirectory(tests)
24 changes: 24 additions & 0 deletions src/lib/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
cmake_minimum_required(VERSION 3.11)

add_subdirectory (proto)
add_subdirectory (flatb)

add_library(${PROJECT_NAME})
target_sources(${PROJECT_NAME} PRIVATE
manager_impl.cpp
factory.cpp
)
target_link_libraries(${PROJECT_NAME}
sisl::sisl
nuraft::nuraft
)

add_library(${PROJECT_NAME}-data-svc OBJECT)
target_sources(${PROJECT_NAME}-data-svc PRIVATE
data_service_grpc.cpp
)
settings_gen_cpp($<TARGET_FILE:flatbuffers::flatc> ${CMAKE_CURRENT_BINARY_DIR}/generated/ ${PROJECT_NAME}-data-svc nuraft_mesg_config.fbs)
target_link_libraries(${PROJECT_NAME}-data-svc
sisl::sisl
nuraft::nuraft
)
10 changes: 2 additions & 8 deletions src/lib/grpc_client.hpp → src/lib/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,10 @@
#include <sisl/grpc/rpc_client.hpp>
#include <sisl/logging/logging.h>

#include "common_lib.hpp"
#include "lib/common_lib.hpp"

namespace nuraft_mesg {

class RaftMessage;

class grpc_resp : public nuraft::resp_msg {
public:
using nuraft::resp_msg::resp_msg;
Expand All @@ -42,17 +40,13 @@ class grpc_base_client : public nuraft::rpc_client {
uint64_t _client_id;

public:
using handle_resp = std::function< void(RaftMessage&, ::grpc::Status&) >;

grpc_base_client() : nuraft::rpc_client::rpc_client(), _client_id(_client_counter++) {}
~grpc_base_client() override = default;

void send(std::shared_ptr< nuraft::req_msg >& req, nuraft::rpc_handler& complete, uint64_t timeout_ms = 0) override;

bool is_abandoned() const override { return false; }
uint64_t get_id() const override { return _client_id; }

protected:
virtual void send(RaftMessage const& message, handle_resp complete) = 0;
};

template < typename TSERVICE >
Expand Down
32 changes: 32 additions & 0 deletions src/lib/common_lib.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <sisl/logging/logging.h>
#include <grpcpp/generic/async_generic_service.h>

#include "nuraft_mesg/common.hpp"

Expand All @@ -10,3 +11,34 @@
#define LOGW(...) LOGWARNMOD(nuraft_mesg, ##__VA_ARGS__)
#define LOGE(...) LOGERRORMOD(nuraft_mesg, ##__VA_ARGS__)
#define LOGC(...) LOGCRITICALMOD(nuraft_mesg, ##__VA_ARGS__)

namespace nuraft_mesg {

[[maybe_unused]] static void serialize_to_byte_buffer(grpc::ByteBuffer& cli_byte_buf, io_blob_list_t const& cli_buf) {
folly::small_vector< grpc::Slice, 4 > slices;
for (auto const& blob : cli_buf) {
slices.emplace_back(blob.bytes, blob.size, grpc::Slice::STATIC_SLICE);
}
cli_byte_buf.Clear();
grpc::ByteBuffer tmp(slices.data(), cli_buf.size());
cli_byte_buf.Swap(&tmp);
}

[[maybe_unused]] static grpc::Status deserialize_from_byte_buffer(grpc::ByteBuffer const& cli_byte_buf,
sisl::io_blob& cli_buf) {
grpc::Slice slice;
auto status = cli_byte_buf.TrySingleSlice(&slice);
if (!status.ok()) { return status; }
cli_buf.bytes = const_cast< uint8_t* >(slice.begin());
cli_buf.size = slice.size();
return status;
}

// generic rpc server looks up rpc name in a map and calls the corresponding callback. To avoid another lookup in this
// layer, we registed one callback for each (group_id, request_name) pair. The rpc_name is their concatenation.
[[maybe_unused]] static std::string get_generic_method_name(std::string const& request_name,
group_id_t const& group_id) {
return fmt::format("{}|{}", request_name, group_id);
}

} // namespace nuraft_mesg
2 changes: 1 addition & 1 deletion src/lib/data_service_grpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#include <sisl/grpc/generic_service.hpp>

#include "data_service_grpc.hpp"
#include "utils.hpp"
#include "common_lib.hpp"

namespace nuraft_mesg {

Expand Down
13 changes: 6 additions & 7 deletions src/lib/grpc_factory.cpp → src/lib/factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@
#include <libnuraft/async.hxx>
#include <sisl/grpc/rpc_client.hpp>

#include "grpc_client.hpp"
#include "client.hpp"
#include "nuraft_mesg/grpc_factory.hpp"
#include "proto/raft_types.pb.h"

namespace nuraft_mesg {

Expand Down Expand Up @@ -133,10 +132,10 @@ grpc_factory::grpc_factory(int const cli_thread_count, std::string const& name)
}

class grpc_error_client : public grpc_base_client {
void send(RaftMessage const&, handle_resp complete) override {
auto null_msg = RaftMessage();
auto status = ::grpc::Status(::grpc::ABORTED, "Bad connection");
complete(null_msg, status);
void send(std::shared_ptr< nuraft::req_msg >& req, nuraft::rpc_handler& complete, uint64_t) override {
auto resp = std::shared_ptr< nuraft::resp_msg >();
auto error = std::make_shared< nuraft::rpc_exception >("Bad connection", req);
complete(resp, error);
}
};

Expand All @@ -156,7 +155,7 @@ nuraft::ptr< nuraft::rpc_client > grpc_factory::create_client(peer_id_t const& c
if (!happened) {
LOGD("Re-creating client for {}", client);
if (auto err = reinit_client(client, it->second); nuraft::OK != err) {
LOGE("Failed to re-initialize client {}: {}", client, err);
LOGW("Failed to re-initialize client {}: {}", client, err);
new_client = std::make_shared< grpc_error_client >();
} else {
new_client = it->second;
Expand Down
32 changes: 32 additions & 0 deletions src/lib/flatb/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
cmake_minimum_required(VERSION 3.11)

list(APPEND SCHEMA_FLAGS "--scoped-enums" "--gen-name-strings" "--cpp-std=c++17" "--cpp-static-reflection")

flatbuffers_generate_headers(
TARGET fbschemas
SCHEMAS "raft_types.fbs"
FLAGS ${SCHEMA_FLAGS}
)
flatbuffers_generate_headers(
TARGET fbservice
SCHEMAS "messaging_service.fbs"
FLAGS "--grpc" ${SCHEMA_FLAGS}
)

add_library(${PROJECT_NAME}-flatb OBJECT)
target_sources(${PROJECT_NAME}-flatb PRIVATE
flatb_client.cpp
$<TARGET_PROPERTY:fbservice,INTERFACE_SOURCES>
)
target_include_directories(${PROJECT_NAME}-flatb PRIVATE
$<TARGET_PROPERTY:fbschemas,INTERFACE_INCLUDE_DIRECTORIES>
${CMAKE_CURRENT_BINARY_DIR}
)
target_link_libraries(${PROJECT_NAME}-flatb
sisl::sisl
nuraft::nuraft
flatbuffers::flatbuffers
gRPC::grpc++
)
add_dependencies(${PROJECT_NAME}-flatb GENERATE_fbservice)
add_dependencies(${PROJECT_NAME}-flatb GENERATE_fbschemas)
80 changes: 80 additions & 0 deletions src/lib/flatb/flatb_client.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*********************************************************************************
* Modifications Copyright 2017-2019 eBay Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
*********************************************************************************/

#include <sisl/fds/buffer.hpp>

// Brief:
// grpc_client does the protobuf transformations on nuraft req's
//
#include "flatb_client.hpp"
#include "utils.hpp"
#include "fbschemas/raft_types_generated.h"

namespace nuraft_mesg {

inline auto fromRequest(nuraft::req_msg& rcmsg) {
flatbuffers::FlatBufferBuilder builder(1024);
auto msg_base = MessageBase{rcmsg.get_term(), rcmsg.get_type(), rcmsg.get_src(), rcmsg.get_dst()};
auto entry_vec = std::vector< flatbuffers::Offset< LogEntry > >();
for (auto const& entry : rcmsg.log_entries()) {
auto& buffer = entry->get_buf();
buffer.pos(0);
auto log_buf = builder.CreateVector(buffer.data(), buffer.size());
entry_vec.push_back(CreateLogEntry(builder, entry->get_term(), (LogType)entry->get_val_type(), log_buf,
entry->get_timestamp()));
}
auto log_entries = builder.CreateVector(entry_vec);
auto req = CreateRequest(builder, &msg_base, rcmsg.get_last_log_term(), rcmsg.get_last_log_idx(),
rcmsg.get_commit_idx(), log_entries);
return req;
}

inline std::shared_ptr< nuraft::resp_msg > toResponse(Response const& resp) {
auto const& base = *resp.msg_base();
auto message = std::make_shared< grpc_resp >(base.term(), (nuraft::msg_type)base.type(), base.src(), base.dest(),
resp.next_index(), resp.accepted());
message->set_result_code((nuraft::cmd_result_code)(resp.result_code()));
if (nuraft::cmd_result_code::NOT_LEADER == message->get_result_code()) {
LOGI("Leader has changed!");
message->dest_addr = resp.dest_addr()->str();
}
if (0 < resp.context()->size()) {
auto ctx_buffer = nuraft::buffer::alloc(resp.context()->size());
memcpy(ctx_buffer->data(), resp.context()->data(), resp.context()->size());
message->set_ctx(ctx_buffer);
}
return message;
}

std::atomic_uint64_t grpc_base_client::_client_counter = 0ul;

void grpc_base_client::send(std::shared_ptr< nuraft::req_msg >& req, nuraft::rpc_handler& complete, uint64_t) {
assert(req && complete);
static_cast< grpc_flatb_client* >(this)->send(
fromRequest(*req), [req, complete](Response& response, ::grpc::Status& status) mutable -> void {
std::shared_ptr< nuraft::rpc_exception > err;
std::shared_ptr< nuraft::resp_msg > resp;

if (status.ok()) {
resp = toResponse(response);
if (!resp) { err = std::make_shared< nuraft::rpc_exception >("missing response", req); }
} else {
err = std::make_shared< nuraft::rpc_exception >(status.error_message(), req);
}
complete(resp, err);
});
}

} // namespace nuraft_mesg
41 changes: 41 additions & 0 deletions src/lib/flatb/flatb_client.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*********************************************************************************
* Modifications Copyright 2017-2019 eBay Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
*********************************************************************************/

// Brief:
// Implements cornerstone's rpc_client::send(...) routine to translate
// and execute the call over gRPC asynchrously.
//
#pragma once

#include "lib/client.hpp"

namespace flatbuffers {
template < typename T >
class Offset;
}

namespace nuraft_mesg {

class Request;
class Response;

class grpc_flatb_client : public grpc_base_client {
public:
using handle_resp = std::function< void(Response&, ::grpc::Status&) >;
using grpc_base_client::grpc_base_client;
virtual void send(flatbuffers::Offset< Request > const& request, handle_resp complete) = 0;
};

} // namespace nuraft_mesg
8 changes: 8 additions & 0 deletions src/lib/flatb/messaging_service.fbs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
include "raft_types.fbs";

namespace nuraft_mesg;

rpc_service Messaging {
RaftStep(Request):Response;
}

Loading