Skip to content

Commit

Permalink
Fixup test_package.
Browse files Browse the repository at this point in the history
  • Loading branch information
szmyd committed Oct 2, 2023
1 parent c0e59de commit 824b779
Show file tree
Hide file tree
Showing 10 changed files with 120 additions and 109 deletions.
16 changes: 15 additions & 1 deletion src/include/mesg_factory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,21 @@ namespace nuraft_mesg {
using group_name_t = std::string;
using group_type_t = std::string;

class group_factory;
class group_factory : public grpc_factory {
std::shared_ptr< sisl::GrpcTokenClient > m_token_client;
static std::string m_ssl_cert;

public:
group_factory(int const cli_thread_count, std::string const& name,
std::shared_ptr< sisl::GrpcTokenClient > const token_client, std::string const& ssl_cert = "");

using grpc_factory::create_client;
std::error_condition create_client(const std::string& client, nuraft::ptr< nuraft::rpc_client >&) override;
std::error_condition reinit_client(std::string const& client,
std::shared_ptr< nuraft::rpc_client >& raft_client) override;

virtual std::string lookupEndpoint(std::string const& client) = 0;
};

class mesg_factory final : public grpc_factory {
std::shared_ptr< group_factory > _group_factory;
Expand Down
48 changes: 4 additions & 44 deletions src/include/mesg_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,12 @@
#include <memory>
#include <string>
#include <system_error>
#include <folly/Expected.h>
#include <folly/small_vector.h>
#include <folly/Unit.h>
#include <folly/futures/Future.h>

#include <libnuraft/nuraft.hxx>
#include <sisl/fds/buffer.hpp>
#include <sisl/logging/logging.h>

#include "mesg_state_mgr.hpp"

SISL_LOGGING_DECL(nuraft)

namespace grpc {
Expand All @@ -37,57 +34,18 @@ class Status;
namespace sisl {
class GrpcTokenVerifier;
class GrpcTokenClient;
class GenericRpcData;
using generic_unary_callback_t = std::function< void(grpc::ByteBuffer&, ::grpc::Status& status) >;
} // namespace sisl

namespace boost {
template < class T >
class intrusive_ptr;
} // namespace boost

namespace nuraft_mesg {

using io_blob_list_t = folly::small_vector< sisl::io_blob, 4 >;

// called by the server after it receives the request
using data_service_request_handler_t =
std::function< void(sisl::io_blob const& incoming_buf, boost::intrusive_ptr< sisl::GenericRpcData >& rpc_data) >;

// This object can be stored by the caller and can be used to directly call raft/data operatons without taking
// _raft_servers_lock
class grpc_server;

template < typename T >
using Result = folly::Expected< T, std::error_condition >;
template < typename T >
using AsyncResult = folly::SemiFuture< Result< T > >;

using NullResult = Result< folly::Unit >;
using NullAsyncResult = AsyncResult< folly::Unit >;

class repl_service_ctx {
public:
repl_service_ctx(grpc_server* server);
virtual ~repl_service_ctx() = default;

// we do not own this pointer. Use this only if the lyfe cycle of the pointer is well known
grpc_server* m_server;
bool is_raft_leader() const;

// data service api client call
virtual AsyncResult< sisl::io_blob > data_service_request(std::string const& request_name,
io_blob_list_t const& cli_buf) = 0;

// Send response to a data service request and finish the async call.
virtual void send_data_service_response(io_blob_list_t const& outgoing_buf,
boost::intrusive_ptr< sisl::GenericRpcData >& rpc_data) = 0;
};

extern int32_t to_server_id(std::string const& server_addr);

class mesg_state_mgr;

class MessagingApplication {
public:
virtual ~MessagingApplication() = default;
Expand Down Expand Up @@ -134,6 +92,8 @@ class Manager {
data_service_request_handler_t const&) = 0;
};

extern int32_t to_server_id(std::string const& server_addr);

extern std::shared_ptr< Manager > init_messaging(Manager::Params const&, std::weak_ptr< MessagingApplication >,
bool with_data_svc = false);

Expand Down
43 changes: 43 additions & 0 deletions src/include/mesg_state_mgr.hpp
Original file line number Diff line number Diff line change
@@ -1,10 +1,53 @@
#pragma once

#include <memory>

#include <folly/Expected.h>
#include <folly/small_vector.h>
#include <folly/Unit.h>
#include <folly/futures/Future.h>

#include <libnuraft/nuraft.hxx>
#include <sisl/fds/buffer.hpp>

namespace boost {
template < class T >
class intrusive_ptr;
} // namespace boost

namespace sisl {
class GenericRpcData;
}

namespace nuraft_mesg {

class mesg_factory;
class grpc_server;

using io_blob_list_t = folly::small_vector< sisl::io_blob, 4 >;

template < typename T >
using Result = folly::Expected< T, std::error_condition >;
template < typename T >
using AsyncResult = folly::SemiFuture< Result< T > >;

class repl_service_ctx {
public:
repl_service_ctx(grpc_server* server);
virtual ~repl_service_ctx() = default;

// we do not own this pointer. Use this only if the lyfe cycle of the pointer is well known
grpc_server* m_server;
bool is_raft_leader() const;

// data service api client call
virtual AsyncResult< sisl::io_blob > data_service_request(std::string const& request_name,
io_blob_list_t const& cli_buf) = 0;

// Send response to a data service request and finish the async call.
virtual void send_data_service_response(io_blob_list_t const& outgoing_buf,
boost::intrusive_ptr< sisl::GenericRpcData >& rpc_data) = 0;
};

class mesg_state_mgr : public nuraft::state_mgr {
public:
Expand Down
2 changes: 2 additions & 0 deletions src/lib/grpc_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*
*********************************************************************************/

#include <sisl/fds/buffer.hpp>

// Brief:
// grpc_client does the protobuf transformations on nuraft req's
//
Expand Down
2 changes: 1 addition & 1 deletion src/lib/messaging.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

#include "service.hpp"
#include "mesg_factory.hpp"
#include "mesg_state_mgr.hpp"
#include "mesg_service.hpp"
#include "logger.hpp"
#include "utils.hpp"

Expand Down
2 changes: 1 addition & 1 deletion src/lib/service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#include "libnuraft/rpc_listener.hxx"
#include "service.hpp"
#include "mesg_factory.hpp"
#include "mesg_state_mgr.hpp"
#include "mesg_service.hpp"

SISL_LOGGING_DECL(nuraft_mesg)

Expand Down
18 changes: 0 additions & 18 deletions src/lib/utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,22 +57,4 @@ static std::string get_generic_method_name(std::string const& request_name, std:
return fmt::format("{}|{}", request_name, group_id);
}

class group_factory : public grpc_factory {
std::shared_ptr< sisl::GrpcTokenClient > m_token_client;
static std::string m_ssl_cert;

public:
group_factory(int const cli_thread_count, std::string const& name,
std::shared_ptr< sisl::GrpcTokenClient > const token_client, std::string const& ssl_cert = "");

using grpc_factory::create_client;

std::error_condition create_client(const std::string& client, nuraft::ptr< nuraft::rpc_client >&) override;

std::error_condition reinit_client(std::string const& client,
std::shared_ptr< nuraft::rpc_client >& raft_client) override;

virtual std::string lookupEndpoint(std::string const& client) = 0;
};

} // namespace nuraft_mesg
1 change: 0 additions & 1 deletion src/tests/test_state_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#pragma once

#include "mesg_service.hpp"
#include "mesg_state_mgr.hpp"
#include <sisl/logging/logging.h>

class test_state_machine;
Expand Down
95 changes: 53 additions & 42 deletions test_package/example_server.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#include <cassert>
#include <csignal>

#include <nuraft_mesg/messaging.hpp>
#include <nuraft_mesg/mesg_service.hpp>
#include <sisl/grpc/rpc_client.hpp>
#include <sisl/grpc/rpc_server.hpp>
#include <sisl/logging/logging.h>
Expand Down Expand Up @@ -45,6 +45,55 @@ void handle(int signal) {
}
}

class Application : public nuraft_mesg::MessagingApplication, public std::enable_shared_from_this< Application > {
public:
std::string name_;
uint32_t port_;
std::string id_;
std::shared_ptr< nuraft_mesg::Manager > manager_;

Application(std::string const& name, uint32_t port) : name_(name), port_(port) { id_ = name; }
~Application() override = default;

std::string lookup_peer(std::string const& peer) override {
// Provide a method for the service layer to lookup an IPv4:port address
// from a uuid; however the process wants to do that.
for (auto i = 0u; i < 5; ++i) {
if (uuids[i] == peer) { return fmt::format(FMT_STRING("127.0.0.1:{}"), 9000 + i); }
}
RELEASE_ASSERT(false, "Missing Peer: {}", peer);
}

std::shared_ptr< nuraft_mesg::mesg_state_mgr > create_state_mgr(int32_t const srv_id,
std::string const& group_id) override {
// Each group has a type so we can attach different state_machines upon Join request.
// This callback should provide a mechanism to return a new state_manager.
auto [it, _] = state_mgr_map.emplace(
std::make_pair(group_id + "_" + name_, std::make_shared< simple_state_mgr >(srv_id, id_, group_id)));
return std::static_pointer_cast< nuraft_mesg::mesg_state_mgr >(it->second);
}

void start() {
auto params = nuraft_mesg::Manager::Params();
params.server_uuid_ = id_;
params.mesg_port_ = port_;
params.default_group_type_ = "test_package";
manager_ = init_messaging(params, weak_from_this(), false);
auto r_params = nuraft::raft_params()
.with_election_timeout_lower(elect_to_low)
.with_election_timeout_upper(elect_to_high)
.with_hb_interval(heartbeat_period)
.with_max_append_size(10)
.with_rpc_failure_backoff(rpc_backoff)
.with_auto_forwarding(true)
.with_snapshot_enabled(0);
manager_->register_mgr_type(params.default_group_type_, r_params);
}

private:
std::map< std::string, std::shared_ptr< simple_state_mgr > > state_mgr_map;
};

int main(int argc, char** argv) {
SISL_OPTIONS_LOAD(argc, argv, logging, server, nuraft_mesg);

Expand All @@ -64,46 +113,8 @@ int main(int argc, char** argv) {
auto const server_port = 9000 + offset_id;
LOGINFO("Server starting as: [{}], port: [{}]", server_uuid, server_port);

// Provide a method for the service layer to lookup an IPv4:port address
// from a uuid; however the process wants to do that.
auto messaging_params =
nuraft_mesg::consensus_component::params{server_uuid, server_port,
[](std::string const& client) -> std::string {
for (auto i = 0u; i < 5; ++i) {
if (uuids[i] == client) {
return fmt::format(FMT_STRING("127.0.0.1:{}"),
9000 + i);
}
}
return client;
},
"none"};

// Intitialize the messaging layer.
auto messaging = nuraft_mesg::service();

// RAFT server parameters
nuraft::raft_params r_params;
r_params.with_election_timeout_lower(elect_to_low)
.with_election_timeout_upper(elect_to_high)
.with_hb_interval(heartbeat_period)
.with_max_append_size(10)
.with_rpc_failure_backoff(rpc_backoff)
.with_auto_forwarding(true)
.with_snapshot_enabled(0);
// Each group has a type so we can attach different state_machines upon Join request.
// This callback should provide a mechanism to return a new state_manager.
auto group_type_params = nuraft_mesg::consensus_component::register_params{
r_params,
[server_uuid](int32_t const srv_id,
std::string const& group_id) -> std::shared_ptr< nuraft_mesg::mesg_state_mgr > {
return std::make_shared< simple_state_mgr >(srv_id, server_uuid, group_id);
}};
messaging.register_mgr_type("test_package", group_type_params);

// This will start the RPC service and begin listening for incomming JOIN groups request.
// You can also call create_group and join_group following this operation.
messaging.start(messaging_params);
auto app = std::make_shared< Application >(server_uuid, server_port);
app->start();

{
auto lck = std::lock_guard< std::mutex >(k_stop_cv_lock);
Expand All @@ -112,7 +123,7 @@ int main(int argc, char** argv) {

// Create a new group with ourself as the only member
if (0 < SISL_OPTIONS.count("create")) {
messaging.create_group(SISL_OPTIONS["create"].as< std::string >(), "test_package");
app->manager_->create_group(SISL_OPTIONS["create"].as< std::string >(), "test_package");
}

// Just prevent main() from exiting, require a SIGNAL
Expand Down
2 changes: 1 addition & 1 deletion test_package/example_state_manager.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#pragma once

#include <nuraft_mesg/mesg_service.hpp>
#include <nuraft_mesg/mesg_state_mgr.hpp>
#include <sisl/logging/logging.h>

class simple_state_mgr : public nuraft_mesg::mesg_state_mgr {
Expand Down

0 comments on commit 824b779

Please sign in to comment.