Skip to content

Commit

Permalink
Merge pull request #75 from hkadayam/pass_event_handler
Browse files Browse the repository at this point in the history
Allow the client of nuraft_mesg to provide a raft event handler
  • Loading branch information
hkadayam authored Mar 6, 2024
2 parents d70fede + aec9515 commit b9566ad
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 10 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

class NuRaftMesgConan(ConanFile):
name = "nuraft_mesg"
version = "3.0.1"
version = "3.1.0"

homepage = "https://github.com/eBay/nuraft_mesg"
description = "A gRPC service for NuRAFT"
Expand Down
12 changes: 12 additions & 0 deletions include/nuraft_mesg/mesg_state_mgr.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include <boost/smart_ptr/intrusive_ptr.hpp>
#include <libnuraft/state_mgr.hxx>
#include <libnuraft/callback.hxx>

#include "common.hpp"

Expand All @@ -21,6 +22,7 @@ namespace nuraft_mesg {

class mesg_factory;
class grpc_server;
class ManagerImpl;

// config for a replica with after the int32_t id is transformed to a peer_id_t
struct replica_config {
Expand Down Expand Up @@ -70,14 +72,24 @@ class mesg_state_mgr : public nuraft::state_mgr {
virtual ~mesg_state_mgr() = default;
void make_repl_ctx(grpc_server* server, std::shared_ptr< mesg_factory > const& cli_factory);

virtual void set_manager_impl(std::weak_ptr< ManagerImpl > manager) { m_manager = manager; }
virtual void become_ready() {}
virtual uint32_t get_logstore_id() const = 0;
virtual std::shared_ptr< nuraft::state_machine > get_state_machine() = 0;
virtual void permanent_destroy() = 0;
virtual void leave() = 0;

virtual std::pair< bool, nuraft::cb_func::ReturnCode > handle_raft_event(nuraft::cb_func::Type,
nuraft::cb_func::Param*) {
return std::pair(false, nuraft::cb_func::ReturnCode::Ok);
}

nuraft::cb_func::ReturnCode internal_raft_event_handler(group_id_t const& group_id, nuraft::cb_func::Type type,
nuraft::cb_func::Param* param);

protected:
std::unique_ptr< repl_service_ctx > m_repl_svc_ctx;
std::weak_ptr< ManagerImpl > m_manager;
};

} // namespace nuraft_mesg
28 changes: 20 additions & 8 deletions src/lib/manager_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ void ManagerImpl::register_mgr_type(group_type_t const& group_type, group_params
if (_state_mgr_types.end() == it) { LOGE("Could not register [group_type={}]", group_type); }
}

void ManagerImpl::raft_event(group_id_t const& group_id, nuraft::cb_func::Type type, nuraft::cb_func::Param* param) {
nuraft::cb_func::ReturnCode ManagerImpl::generic_raft_event_handler(group_id_t const& group_id,
nuraft::cb_func::Type type,
nuraft::cb_func::Param* param) {
switch (type) {
case nuraft::cb_func::RemovedFromCluster: {
LOGI("[srv_id={}] evicted from: [group={}]", start_params_.server_uuid_, group_id);
Expand Down Expand Up @@ -149,6 +151,7 @@ void ManagerImpl::raft_event(group_id_t const& group_id, nuraft::cb_func::Type t
default:
break;
};
return nuraft::cb_func::ReturnCode::Ok;
}

void ManagerImpl::exit_group(group_id_t const& group_id) {
Expand All @@ -166,7 +169,7 @@ nuraft::cmd_result_code ManagerImpl::group_init(int32_t const srv_id, group_id_t
LOGD("Creating context for: [group_id={}] as Member: {}", group_id, srv_id);

// State manager (RAFT log store, config)
std::shared_ptr< nuraft::state_mgr > smgr;
std::shared_ptr< mesg_state_mgr > smgr;
std::shared_ptr< nuraft::state_machine > sm;
nuraft::raft_params params;
{
Expand All @@ -184,9 +187,10 @@ nuraft::cmd_result_code ManagerImpl::group_init(int32_t const srv_id, group_id_t
LOGD("Creating new State Manager for: [group={}], type: {}", group_id, group_type);
it->second = application_.lock()->create_state_mgr(srv_id, group_id);
}
it->second->become_ready();
sm = it->second->get_state_machine();
smgr = it->second;
smgr->become_ready();
sm = smgr->get_state_machine();
smgr->set_manager_impl(shared_from_this());
} else {
return nuraft::cmd_result_code::CANCELLED;
}
Expand All @@ -200,10 +204,11 @@ nuraft::cmd_result_code ManagerImpl::group_init(int32_t const srv_id, group_id_t
std::shared_ptr< nuraft::rpc_listener > listener;

nuraft::ptr< nuraft::logger > logger = std::make_shared< nuraft_mesg_logger >(group_id, _custom_logger);
ctx = new nuraft::context(smgr, sm, listener, logger, rpc_cli_factory, _scheduler, params);
ctx->set_cb_func([wp = std::weak_ptr< ManagerImpl >(shared_from_this()), group_id](nuraft::cb_func::Type type,
nuraft::cb_func::Param* param) {
if (auto sp = wp.lock(); sp) sp->raft_event(group_id, type, param);
auto base_smgr = std::static_pointer_cast< nuraft::state_mgr >(smgr);
ctx = new nuraft::context(base_smgr, sm, listener, logger, rpc_cli_factory, _scheduler, params);
ctx->set_cb_func([wp = std::weak_ptr< mesg_state_mgr >(smgr), group_id](nuraft::cb_func::Type type,
nuraft::cb_func::Param* param) {
if (auto sp = wp.lock(); sp) { return sp->internal_raft_event_handler(group_id, type, param); }
return nuraft::cb_func::Ok;
});

Expand Down Expand Up @@ -357,6 +362,13 @@ void mesg_state_mgr::make_repl_ctx(grpc_server* server, std::shared_ptr< mesg_fa
m_repl_svc_ctx = std::make_unique< repl_service_ctx_grpc >(server, cli_factory);
}

nuraft::cb_func::ReturnCode mesg_state_mgr::internal_raft_event_handler(group_id_t const& group_id,
nuraft::cb_func::Type type,
nuraft::cb_func::Param* param) {
if (auto const [handled, ret] = handle_raft_event(type, param); handled) { return ret; }
return m_manager.lock()->generic_raft_event_handler(group_id, type, param);
}

std::shared_ptr< Manager > init_messaging(Manager::Params const& p, std::weak_ptr< MessagingApplication > w,
bool with_data_svc) {
RELEASE_ASSERT(w.lock(), "Could not acquire application!");
Expand Down
4 changes: 3 additions & 1 deletion src/lib/manager_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ class ManagerImpl : public Manager, public std::enable_shared_from_this< Manager
nuraft::ptr< nuraft::delayed_task_scheduler > _scheduler;
std::shared_ptr< sisl::logging::logger_t > _custom_logger;

void raft_event(group_id_t const& group_id, nuraft::cb_func::Type type, nuraft::cb_func::Param* param);
void exit_group(group_id_t const& group_id);

public:
Expand Down Expand Up @@ -93,6 +92,9 @@ class ManagerImpl : public Manager, public std::enable_shared_from_this< Manager
nuraft::cmd_result_code group_init(int32_t const srv_id, group_id_t const& group_id, group_type_t const& group_type,
nuraft::context*& ctx, std::shared_ptr< group_metrics > metrics);
void start(bool and_data_svc);
nuraft::cb_func::ReturnCode generic_raft_event_handler(group_id_t const& group_id, nuraft::cb_func::Type type,
nuraft::cb_func::Param* param);

//
};

Expand Down

0 comments on commit b9566ad

Please sign in to comment.