Skip to content

Commit

Permalink
Review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
szmyd committed Oct 3, 2023
1 parent a2c4749 commit 2186f32
Show file tree
Hide file tree
Showing 24 changed files with 359 additions and 392 deletions.
6 changes: 2 additions & 4 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ cmake_minimum_required(VERSION 3.10)
project(nuraft_mesg)
enable_testing()

set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD 20)

if(EXISTS ${CMAKE_BINARY_DIR}/conanbuildinfo.cmake)
include(${CMAKE_BINARY_DIR}/conanbuildinfo.cmake)
Expand All @@ -11,10 +11,8 @@ else()
message(WARNING "Conan Build file does not exist, trying to build without!")
endif()

if (${CMAKE_BUILD_TYPE} STREQUAL Debug)
if (NOT ${CONAN_SETTINGS_COMPILER} STREQUAL "clang" AND NOT ${CONAN_SETTINGS_COMPILER} STREQUAL "apple-clang")
if (${CMAKE_BUILD_TYPE} STREQUAL Debug AND ${CMAKE_CXX_COMPILER_ID} STREQUAL GNU)
include (cmake/debug_flags.cmake)
endif()
endif()
if (${MEMORY_SANITIZER_ON})
include (cmake/mem_sanitizer.cmake)
Expand Down
59 changes: 59 additions & 0 deletions src/include/nuraft_mesg/common.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#pragma once

#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_io.hpp>

#include <folly/Expected.h>
#include <folly/small_vector.h>
#include <folly/Unit.h>
#include <folly/futures/Future.h>

#include <libnuraft/async.hxx>
#include <sisl/fds/buffer.hpp>
#include <sisl/logging/logging.h>

SISL_LOGGING_DECL(nuraft)
SISL_LOGGING_DECL(nuraft_mesg)

namespace nuraft_mesg {

using peer_id_t = boost::uuids::uuid;
using group_id_t = boost::uuids::uuid;
using group_type_t = std::string;

using io_blob_list_t = folly::small_vector< sisl::io_blob, 4 >;

template < typename T >
using Result = folly::Expected< T, nuraft::cmd_result_code >;
template < typename T >
using AsyncResult = folly::SemiFuture< Result< T > >;

} // namespace nuraft_mesg

namespace fmt {
template <>
struct formatter< nuraft_mesg::group_id_t > {
template < typename ParseContext >
constexpr auto parse(ParseContext& ctx) {
return ctx.begin();
}

template < typename FormatContext >
auto format(nuraft_mesg::group_id_t const& n, FormatContext& ctx) {
return format_to(ctx.out(), "{}", to_string(n));
}
};

template <>
struct formatter< nuraft::cmd_result_code > {
template < typename ParseContext >
constexpr auto parse(ParseContext& ctx) {
return ctx.begin();
}

template < typename FormatContext >
auto format(nuraft::cmd_result_code const& c, FormatContext& ctx) {
return format_to(ctx.out(), "{}", int32_t(c));
}
};
} // namespace fmt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include <folly/SharedMutex.h>
#include <libnuraft/nuraft.hxx>

#include "common.hpp"

namespace nuraft_mesg {

using client_factory_lock_type = folly::SharedMutex;
Expand All @@ -33,7 +35,7 @@ class grpc_factory : public nuraft::rpc_client_factory, public std::enable_share

protected:
client_factory_lock_type _client_lock;
std::map< std::string, std::shared_ptr< nuraft::rpc_client > > _clients;
std::map< peer_id_t, std::shared_ptr< nuraft::rpc_client > > _clients;

public:
grpc_factory(int const cli_thread_count, std::string const& name);
Expand All @@ -43,9 +45,9 @@ class grpc_factory : public nuraft::rpc_client_factory, public std::enable_share

nuraft::ptr< nuraft::rpc_client > create_client(const std::string& client) override;

virtual std::error_condition create_client(const std::string& client, nuraft::ptr< nuraft::rpc_client >&) = 0;
virtual std::error_condition create_client(peer_id_t const& client, nuraft::ptr< nuraft::rpc_client >&) = 0;

virtual std::error_condition reinit_client(const std::string& client, nuraft::ptr< nuraft::rpc_client >&) = 0;
virtual std::error_condition reinit_client(peer_id_t const& client, nuraft::ptr< nuraft::rpc_client >&) = 0;

// Construct and send an AddServer message to the cluster
std::future< nuraft::cmd_result_code > add_server(uint32_t const srv_id, std::string const& srv_addr,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,3 @@
/*********************************************************************************
* Modifications Copyright 2017-2019 eBay Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
*********************************************************************************/

// Brief:
// Translates and forwards the gRPC Step() to cornerstone's raft_server::step()
//
#pragma once

#include <libnuraft/raft_server_handler.hxx>
Expand All @@ -29,6 +11,9 @@ namespace nuraft_mesg {

class RaftMessage;

// Brief:
// Translates and forwards the gRPC Step() to cornerstone's raft_server::step()
//
class grpc_server : public nuraft::raft_server_handler {
std::shared_ptr< nuraft::raft_server > _raft_server;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
#include <mutex>
#include <string>

#include <boost/uuid/uuid_io.hpp>

#include "grpc_factory.hpp"
#include <sisl/logging/logging.h>
#include <sisl/metrics/metrics.hpp>
#include "mesg_service.hpp"
#include "nuraft_mesg.hpp"

namespace sisl {
struct io_blob;
Expand All @@ -35,37 +37,37 @@ class group_factory : public grpc_factory {
static std::string m_ssl_cert;

public:
group_factory(int const cli_thread_count, std::string const& name,
group_factory(int const cli_thread_count, boost::uuids::uuid const& name,
std::shared_ptr< sisl::GrpcTokenClient > const token_client, std::string const& ssl_cert = "");

using grpc_factory::create_client;
std::error_condition create_client(const std::string& client, nuraft::ptr< nuraft::rpc_client >&) override;
std::error_condition reinit_client(std::string const& client,
std::error_condition create_client(peer_id_t const& client, nuraft::ptr< nuraft::rpc_client >&) override;
std::error_condition reinit_client(peer_id_t const& client,
std::shared_ptr< nuraft::rpc_client >& raft_client) override;

virtual std::string lookupEndpoint(std::string const& client) = 0;
virtual std::string lookupEndpoint(peer_id_t const& client) = 0;
};

class mesg_factory final : public grpc_factory {
std::shared_ptr< group_factory > _group_factory;
group_name_t const _group_name;
group_id_t const _group_name;
group_type_t const _group_type;
std::shared_ptr< sisl::MetricsGroupWrapper > _metrics;

public:
mesg_factory(std::shared_ptr< group_factory > g_factory, group_name_t const& grp_id, group_type_t const& grp_type,
mesg_factory(std::shared_ptr< group_factory > g_factory, group_id_t const& grp_id, group_type_t const& grp_type,
std::shared_ptr< sisl::MetricsGroupWrapper > metrics = nullptr) :
grpc_factory(0, grp_id),
grpc_factory(0, to_string(grp_id)),
_group_factory(g_factory),
_group_name(grp_id),
_group_type(grp_type),
_metrics(metrics) {}

group_name_t group_name() const { return _group_name; }
group_id_t group_name() const { return _group_name; }

std::error_condition create_client(const std::string& client, nuraft::ptr< nuraft::rpc_client >& rpc_ptr) override;
std::error_condition create_client(peer_id_t const& client, nuraft::ptr< nuraft::rpc_client >& rpc_ptr) override;

std::error_condition reinit_client(const std::string& client,
std::error_condition reinit_client(peer_id_t const& client,
std::shared_ptr< nuraft::rpc_client >& raft_client) override;

AsyncResult< sisl::io_blob > data_service_request(std::string const& request_name, io_blob_list_t const& cli_buf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,9 @@

#include <memory>

#include <folly/Expected.h>
#include <folly/small_vector.h>
#include <folly/Unit.h>
#include <folly/futures/Future.h>

#include <libnuraft/nuraft.hxx>
#include <sisl/fds/buffer.hpp>

#include "common.hpp"

namespace boost {
template < class T >
Expand All @@ -24,13 +20,6 @@ namespace nuraft_mesg {
class mesg_factory;
class grpc_server;

using io_blob_list_t = folly::small_vector< sisl::io_blob, 4 >;

template < typename T >
using Result = folly::Expected< T, std::error_condition >;
template < typename T >
using AsyncResult = folly::SemiFuture< Result< T > >;

class repl_service_ctx {
public:
repl_service_ctx(grpc_server* server);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@
#include <libnuraft/nuraft.hxx>
#include <sisl/logging/logging.h>

#include "common.hpp"
#include "mesg_state_mgr.hpp"

SISL_LOGGING_DECL(nuraft)

namespace grpc {
class ByteBuffer;
class Status;
Expand All @@ -40,9 +39,6 @@ using generic_unary_callback_t = std::function< void(grpc::ByteBuffer&, ::grpc::

namespace nuraft_mesg {

using group_name_t = std::string;
using group_type_t = std::string;

// called by the server after it receives the request
using data_service_request_handler_t =
std::function< void(sisl::io_blob const& incoming_buf, boost::intrusive_ptr< sisl::GenericRpcData >& rpc_data) >;
Expand All @@ -53,14 +49,14 @@ using NullAsyncResult = AsyncResult< folly::Unit >;
class MessagingApplication {
public:
virtual ~MessagingApplication() = default;
virtual std::string lookup_peer(std::string const&) = 0;
virtual std::shared_ptr< mesg_state_mgr > create_state_mgr(int32_t const srv_id, group_name_t const& group_id) = 0;
virtual std::string lookup_peer(peer_id_t const&) = 0;
virtual std::shared_ptr< mesg_state_mgr > create_state_mgr(int32_t const srv_id, group_id_t const& group_id) = 0;
};

class Manager {
public:
struct Params {
std::string server_uuid_;
boost::uuids::uuid server_uuid_;
uint32_t mesg_port_;
group_type_t default_group_type_;
std::string ssl_key_;
Expand All @@ -74,32 +70,33 @@ class Manager {
// Register a new group type
virtual void register_mgr_type(group_type_t const& group_type, group_params const&) = 0;

virtual std::shared_ptr< mesg_state_mgr > lookup_state_manager(group_name_t const& group_id) const = 0;
virtual NullAsyncResult create_group(group_name_t const& group_id, group_type_t const& group_type) = 0;
virtual NullResult join_group(group_name_t const& group_id, group_type_t const& group_type,
virtual std::shared_ptr< mesg_state_mgr > lookup_state_manager(group_id_t const& group_id) const = 0;
virtual NullAsyncResult create_group(group_id_t const& group_id, group_type_t const& group_type) = 0;
virtual NullResult join_group(group_id_t const& group_id, group_type_t const& group_type,
std::shared_ptr< mesg_state_mgr >) = 0;

// Send a client request to the cluster
virtual NullAsyncResult add_member(group_name_t const& group_id, std::string const& server_id) = 0;
virtual NullAsyncResult rem_member(group_name_t const& group_id, std::string const& server_id) = 0;
virtual NullAsyncResult become_leader(group_name_t const& group_id) = 0;
virtual NullAsyncResult client_request(group_name_t const& group_id, std::shared_ptr< nuraft::buffer >&) = 0;
virtual NullAsyncResult add_member(group_id_t const& group_id, peer_id_t const& server_id) = 0;
virtual NullAsyncResult rem_member(group_id_t const& group_id, peer_id_t const& server_id) = 0;
virtual NullAsyncResult become_leader(group_id_t const& group_id) = 0;
virtual NullAsyncResult append_entries(group_id_t const& group_id,
std::vector< std::shared_ptr< nuraft::buffer > > const&) = 0;

// Misc Mgmt
virtual void get_srv_config_all(group_name_t const& group_id,
virtual void get_srv_config_all(group_id_t const& group_id,
std::vector< std::shared_ptr< nuraft::srv_config > >& configs_out) = 0;
virtual void leave_group(group_name_t const& group_id) = 0;
virtual void append_peers(group_name_t const& group_id, std::list< std::string >&) const = 0;
virtual uint32_t logstore_id(group_name_t const& group_id) const = 0;
virtual void leave_group(group_id_t const& group_id) = 0;
virtual void append_peers(group_id_t const& group_id, std::list< peer_id_t >&) const = 0;
virtual uint32_t logstore_id(group_id_t const& group_id) const = 0;
virtual int32_t server_id() const = 0;
virtual void restart_server() = 0;

// data channel APIs
virtual bool bind_data_service_request(std::string const& request_name, group_name_t const& group_id,
virtual bool bind_data_service_request(std::string const& request_name, group_id_t const& group_id,
data_service_request_handler_t const&) = 0;
};

extern int32_t to_server_id(std::string const& server_addr);
extern int32_t to_server_id(peer_id_t const& server_addr);

extern std::shared_ptr< Manager > init_messaging(Manager::Params const&, std::weak_ptr< MessagingApplication >,
bool with_data_svc = false);
Expand Down
32 changes: 0 additions & 32 deletions src/lib/data_service.hpp

This file was deleted.

6 changes: 3 additions & 3 deletions src/lib/data_service_grpc.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#include <boost/uuid/uuid_io.hpp>
#include <sisl/grpc/generic_service.hpp>

#include "data_service_grpc.hpp"
#include "utils.hpp"

SISL_LOGGING_DECL(nuraft_mesg)

namespace nuraft_mesg {

void data_service_grpc::set_grpc_server(sisl::GrpcServer* server) { _grpc_server = server; }
Expand All @@ -23,7 +23,7 @@ void data_service_grpc::bind() {
}
}

bool data_service_grpc::bind(std::string const& request_name, std::string const& group_id,
bool data_service_grpc::bind(std::string const& request_name, group_id_t const& group_id,
data_service_request_handler_t const& request_cb) {
RELEASE_ASSERT(_grpc_server, "NULL _grpc_server!");
if (!request_cb) {
Expand Down
Loading

0 comments on commit 2186f32

Please sign in to comment.