Skip to content

Commit

Permalink
More cleanup (#48)
Browse files Browse the repository at this point in the history
  • Loading branch information
szmyd authored Nov 11, 2023
1 parent d7e3366 commit 3080c67
Show file tree
Hide file tree
Showing 21 changed files with 287 additions and 301 deletions.
1 change: 1 addition & 0 deletions .codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ ignore:
- "**/*.pb.h"
- "**/tests/*"
- "**/jungle_logstore/*"
- "**/flatb/*"
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

class NuRaftMesgConan(ConanFile):
name = "nuraft_mesg"
version = "2.0.3"
version = "2.0.4"

homepage = "https://github.com/eBay/nuraft_mesg"
description = "A gRPC service for NuRAFT"
Expand Down
63 changes: 0 additions & 63 deletions src/include/nuraft_mesg/grpc_factory.hpp

This file was deleted.

45 changes: 40 additions & 5 deletions src/include/nuraft_mesg/mesg_factory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,59 @@
*********************************************************************************/
#pragma once

#include <libnuraft/async.hxx>
#include <memory>
#include <mutex>
#include <string>

#include <boost/uuid/uuid_io.hpp>

#include "grpc_factory.hpp"
#include <folly/SharedMutex.h>
#include <sisl/logging/logging.h>
#include <sisl/metrics/metrics.hpp>
#include "nuraft_mesg.hpp"
#include <libnuraft/nuraft.hxx>

#include "common.hpp"

namespace sisl {
struct io_blob;
}
class GrpcTokenClient;
} // namespace sisl

namespace nuraft_mesg {

using client_factory_lock_type = folly::SharedMutex;
// Brief:
// Implements cornerstone's rpc_client_factory providing sisl::GrpcAsyncClient
// inherited rpc_client instances sharing a common worker pool.
class grpc_factory : public nuraft::rpc_client_factory, public std::enable_shared_from_this< grpc_factory > {
std::string _worker_name;

protected:
client_factory_lock_type _client_lock;
std::map< peer_id_t, std::shared_ptr< nuraft::rpc_client > > _clients;

public:
grpc_factory(int const cli_thread_count, std::string const& name);
~grpc_factory() override = default;

std::string const& workerName() const { return _worker_name; }

nuraft::ptr< nuraft::rpc_client > create_client(const std::string& client) override;
nuraft::ptr< nuraft::rpc_client > create_client(peer_id_t const& client);

virtual nuraft::cmd_result_code create_client(peer_id_t const& client, nuraft::ptr< nuraft::rpc_client >&) = 0;

virtual nuraft::cmd_result_code reinit_client(peer_id_t const& client, nuraft::ptr< nuraft::rpc_client >&) = 0;

// Construct and send an AddServer message to the cluster
NullAsyncResult add_server(uint32_t const srv_id, peer_id_t const& srv_addr, nuraft::srv_config const& dest_cfg);

// Send a client request to the cluster
NullAsyncResult append_entry(std::shared_ptr< nuraft::buffer > buf, nuraft::srv_config const& dest_cfg);

// Construct and send a RemoveServer message to the cluster
NullAsyncResult rem_server(uint32_t const srv_id, nuraft::srv_config const& dest_cfg);
};

class group_factory : public grpc_factory {
std::shared_ptr< sisl::GrpcTokenClient > m_token_client;
static std::string m_ssl_cert;
Expand Down
6 changes: 1 addition & 5 deletions src/include/nuraft_mesg/mesg_state_mgr.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,11 @@

#include <memory>

#include <boost/smart_ptr/intrusive_ptr.hpp>
#include <libnuraft/nuraft.hxx>

#include "common.hpp"

namespace boost {
template < class T >
class intrusive_ptr;
} // namespace boost

namespace sisl {
class GenericRpcData;
}
Expand Down
6 changes: 4 additions & 2 deletions src/include/nuraft_mesg/nuraft_mesg.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,27 @@
#include <string>
#include <vector>

#include <boost/smart_ptr/intrusive_ptr.hpp>
#include <libnuraft/nuraft.hxx>
#include <sisl/logging/logging.h>

#include "common.hpp"
#include "mesg_state_mgr.hpp"

namespace grpc {
class ByteBuffer;
class Status;
} // namespace grpc

namespace sisl {
class GenericRpcData;
class GrpcTokenVerifier;
class GrpcTokenClient;
using generic_unary_callback_t = std::function< void(grpc::ByteBuffer&, ::grpc::Status& status) >;
} // namespace sisl

namespace nuraft_mesg {

class mesg_state_mgr;

// called by the server after it receives the request
using data_service_request_handler_t =
std::function< void(sisl::io_blob const& incoming_buf, boost::intrusive_ptr< sisl::GenericRpcData >& rpc_data) >;
Expand Down
13 changes: 7 additions & 6 deletions src/lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,21 @@ add_library(${PROJECT_NAME})
target_sources(${PROJECT_NAME} PRIVATE
manager_impl.cpp
factory.cpp
repl_service_ctx.cpp
service.cpp
grpc_server.cpp
)
target_link_libraries(${PROJECT_NAME}
sisl::sisl
nuraft::nuraft
)
sisl::sisl
nuraft::nuraft
)

add_library(${PROJECT_NAME}-data-svc OBJECT)
target_sources(${PROJECT_NAME}-data-svc PRIVATE
data_service_grpc.cpp
)
settings_gen_cpp($<TARGET_FILE:flatbuffers::flatc> ${CMAKE_CURRENT_BINARY_DIR}/generated/ ${PROJECT_NAME}-data-svc nuraft_mesg_config.fbs)
target_link_libraries(${PROJECT_NAME}-data-svc
sisl::sisl
nuraft::nuraft
)
sisl::sisl
nuraft::nuraft
)
5 changes: 1 addition & 4 deletions src/lib/factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,10 @@
// grpc_factory static functions that makes for easy client creation.
//
#include <boost/uuid/string_generator.hpp>
#include <folly/Expected.h>
#include <folly/futures/Future.h>
#include <libnuraft/async.hxx>
#include <sisl/grpc/rpc_client.hpp>

#include "client.hpp"
#include "nuraft_mesg/grpc_factory.hpp"
#include "nuraft_mesg/mesg_factory.hpp"

namespace nuraft_mesg {

Expand Down
74 changes: 4 additions & 70 deletions src/lib/manager_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@
#include <sisl/grpc/rpc_server.hpp>
#include <sisl/grpc/generic_service.hpp>

#include "service.hpp"
#include "nuraft_mesg/mesg_factory.hpp"
#include "nuraft_mesg/mesg_state_mgr.hpp"
#include "nuraft_mesg/nuraft_mesg.hpp"

#include "repl_service_ctx.hpp"
#include "service.hpp"
#include "logger.hpp"

constexpr auto leader_change_timeout = std::chrono::milliseconds(3200);
Expand Down Expand Up @@ -354,75 +357,6 @@ bool ManagerImpl::bind_data_service_request(std::string const& request_name, gro
return _mesg_service->bind_data_service_request(request_name, group_id, request_handler);
}

// The endpoint field of the raft_server config is the uuid of the server.
const std::string repl_service_ctx_grpc::id_to_str(int32_t const id) const {
auto const& srv_config = _server->get_config()->get_server(id);
return (srv_config) ? srv_config->get_endpoint() : std::string();
}

const std::optional< Result< peer_id_t > > repl_service_ctx_grpc::get_peer_id(destination_t const& dest) const {
if (std::holds_alternative< peer_id_t >(dest)) return std::get< peer_id_t >(dest);
if (std::holds_alternative< role_regex >(dest)) {
if (!_server) {
LOGW("server not initialized");
return folly::makeUnexpected(nuraft::cmd_result_code::SERVER_NOT_FOUND);
}
switch (std::get< role_regex >(dest)) {
case role_regex::LEADER: {
if (is_raft_leader()) return folly::makeUnexpected(nuraft::cmd_result_code::BAD_REQUEST);
auto const leader = _server->get_leader();
if (leader == -1) return folly::makeUnexpected(nuraft::cmd_result_code::SERVER_NOT_FOUND);
return boost::uuids::string_generator()(id_to_str(leader));
} break;
case role_regex::ALL: {
return std::nullopt;
} break;
default: {
LOGE("Method not implemented");
return folly::makeUnexpected(nuraft::cmd_result_code::BAD_REQUEST);
} break;
}
}
DEBUG_ASSERT(false, "Unknown destination type");
return folly::makeUnexpected(nuraft::cmd_result_code::BAD_REQUEST);
}

NullAsyncResult repl_service_ctx_grpc::data_service_request_unidirectional(destination_t const& dest,
std::string const& request_name,
io_blob_list_t const& cli_buf) {
return (!m_mesg_factory)
? folly::makeUnexpected(nuraft::cmd_result_code::SERVER_NOT_FOUND)
: m_mesg_factory->data_service_request_unidirectional(get_peer_id(dest), request_name, cli_buf);
}

AsyncResult< sisl::io_blob > repl_service_ctx_grpc::data_service_request_bidirectional(destination_t const& dest,
std::string const& request_name,
io_blob_list_t const& cli_buf) {
return (!m_mesg_factory)
? folly::makeUnexpected(nuraft::cmd_result_code::SERVER_NOT_FOUND)
: m_mesg_factory->data_service_request_bidirectional(get_peer_id(dest), request_name, cli_buf);
}

repl_service_ctx::repl_service_ctx(nuraft::raft_server* server) : _server(server) {}

bool repl_service_ctx::is_raft_leader() const { return _server->is_leader(); }

void repl_service_ctx::get_cluster_config(std::list< replica_config >& cluster_config) const {
auto const& srv_configs = _server->get_config()->get_servers();
for (auto const& srv_config : srv_configs) {
cluster_config.emplace_back(replica_config{srv_config->get_endpoint(), srv_config->get_aux()});
}
}

repl_service_ctx_grpc::repl_service_ctx_grpc(grpc_server* server, std::shared_ptr< mesg_factory > const& cli_factory) :
repl_service_ctx(server ? server->raft_server().get() : nullptr), m_mesg_factory(cli_factory) {}

void repl_service_ctx_grpc::send_data_service_response(io_blob_list_t const& outgoing_buf,
boost::intrusive_ptr< sisl::GenericRpcData >& rpc_data) {
serialize_to_byte_buffer(rpc_data->response(), outgoing_buf);
rpc_data->send_response();
}

void mesg_state_mgr::make_repl_ctx(grpc_server* server, std::shared_ptr< mesg_factory > const& cli_factory) {
m_repl_svc_ctx = std::make_unique< repl_service_ctx_grpc >(server, cli_factory);
}
Expand Down
20 changes: 1 addition & 19 deletions src/lib/manager_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include <sisl/logging/logging.h>

#include "nuraft_mesg/nuraft_mesg.hpp"
#include "nuraft_mesg/mesg_factory.hpp"
#include "common_lib.hpp"

Expand Down Expand Up @@ -94,23 +95,4 @@ class ManagerImpl : public Manager {
std::vector< std::shared_ptr< nuraft::srv_config > >& configs_out) override;
};

class repl_service_ctx_grpc : public repl_service_ctx {
public:
repl_service_ctx_grpc(grpc_server* server, std::shared_ptr< mesg_factory > const& cli_factory);
~repl_service_ctx_grpc() override = default;
std::shared_ptr< mesg_factory > m_mesg_factory;

NullAsyncResult data_service_request_unidirectional(destination_t const& dest, std::string const& request_name,
io_blob_list_t const& cli_buf) override;
AsyncResult< sisl::io_blob > data_service_request_bidirectional(destination_t const& dest,
std::string const& request_name,
io_blob_list_t const& cli_buf) override;
void send_data_service_response(io_blob_list_t const& outgoing_buf,
boost::intrusive_ptr< sisl::GenericRpcData >& rpc_data) override;

private:
const std::optional< Result< peer_id_t > > get_peer_id(destination_t const& dest) const;
const std::string id_to_str(int32_t const id) const;
};

} // namespace nuraft_mesg
3 changes: 3 additions & 0 deletions src/lib/proto/proto_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ inline std::shared_ptr< nuraft::resp_msg > toResponse(RaftMessage const& raft_ms

std::atomic_uint64_t grpc_base_client::_client_counter = 0ul;

///
// This is where the magic of serialization happens starting with creating a RaftMessage and invoking our
// specific ::send() which will later transform into a RaftGroupMsg
void grpc_base_client::send(std::shared_ptr< nuraft::req_msg >& req, nuraft::rpc_handler& complete, uint64_t) {
assert(req && complete);
RaftMessage grpc_request;
Expand Down
Loading

0 comments on commit 3080c67

Please sign in to comment.