Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove confusing indirection. #49

Merged
merged 1 commit into from
Nov 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 18 additions & 21 deletions src/lib/manager_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ ManagerImpl::~ManagerImpl() {
}
}

ManagerImpl::ManagerImpl(Manager::Params const& start_params, std::weak_ptr< MessagingApplication > app,
bool and_data_svc) :
ManagerImpl::ManagerImpl(Manager::Params const& start_params, std::weak_ptr< MessagingApplication > app) :
start_params_(start_params), _srv_id(to_server_id(start_params_.server_uuid_)), application_(app) {
_g_factory = std::make_shared< engine_factory >(grpc_client_threads, start_params_, app);
auto logger_name = fmt::format("nuraft_{}", start_params_.server_uuid_);
Expand All @@ -76,19 +75,13 @@ ManagerImpl::ManagerImpl(Manager::Params const& start_params, std::weak_ptr< Mes
nuraft::asio_service::options service_options;
service_options.thread_pool_size_ = 1;
_scheduler = std::make_shared< nuraft::asio_service >(service_options, logger);
}

// The function passed to msg_service will be called each time a new group is joined,
// allowing sharing of the Server and client amongst raft instances.

_mesg_service = msg_service::create(
[this](int32_t const srv_id, group_id_t const& group_id, group_type_t const& group_type, nuraft::context*& ctx,
std::shared_ptr< group_metrics > metrics) mutable -> nuraft::cmd_result_code {
return this->group_init(srv_id, group_id, group_type, ctx, metrics);
},
start_params_.server_uuid_, and_data_svc);
_mesg_service->setDefaultGroupType(start_params_.default_group_type_);

// Start a gRPC server and create and associate nuraft_mesg services.
void ManagerImpl::start(bool and_data_svc) {
if (auto lg = std::lock_guard< std::mutex >(_manager_lock); !_mesg_service) {
_mesg_service = msg_service::create(shared_from_this(), start_params_.server_uuid_, and_data_svc);
_mesg_service->setDefaultGroupType(start_params_.default_group_type_);
}
restart_server();
}

Expand All @@ -97,6 +90,7 @@ void ManagerImpl::restart_server() {
LOGI("Starting Messaging Service on http://{}", listen_address);

std::lock_guard< std::mutex > lg(_manager_lock);
RELEASE_ASSERT(_mesg_service, "Need to call ::start() first!");
_grpc_server.reset();
_grpc_server = std::unique_ptr< sisl::GrpcServer >(
sisl::GrpcServer::make(listen_address, start_params_.token_verifier_, grpc_server_threads,
Expand All @@ -115,8 +109,7 @@ void ManagerImpl::register_mgr_type(group_type_t const& group_type, group_params
if (_state_mgr_types.end() == it) { LOGE("Could not register [group_type={}]", group_type); }
}

nuraft::cb_func::ReturnCode ManagerImpl::raft_event(group_id_t const& group_id, nuraft::cb_func::Type type,
nuraft::cb_func::Param* param) {
void ManagerImpl::raft_event(group_id_t const& group_id, nuraft::cb_func::Type type, nuraft::cb_func::Param* param) {
switch (type) {
case nuraft::cb_func::RemovedFromCluster: {
LOGI("[srv_id={}] evicted from: [group={}]", start_params_.server_uuid_, group_id);
Expand Down Expand Up @@ -154,7 +147,6 @@ nuraft::cb_func::ReturnCode ManagerImpl::raft_event(group_id_t const& group_id,
default:
break;
};
return nuraft::cb_func::Ok;
}

void ManagerImpl::exit_group(group_id_t const& group_id) {
Expand Down Expand Up @@ -207,8 +199,10 @@ nuraft::cmd_result_code ManagerImpl::group_init(int32_t const srv_id, group_id_t

nuraft::ptr< nuraft::logger > logger = std::make_shared< nuraft_mesg_logger >(group_id, _custom_logger);
ctx = new nuraft::context(smgr, sm, listener, logger, rpc_cli_factory, _scheduler, params);
ctx->set_cb_func([this, group_id](nuraft::cb_func::Type type, nuraft::cb_func::Param* param) mutable {
return this->raft_event(group_id, type, param);
ctx->set_cb_func([wp = std::weak_ptr< ManagerImpl >(shared_from_this()), group_id](nuraft::cb_func::Type type,
nuraft::cb_func::Param* param) {
if (auto sp = wp.lock(); sp) sp->raft_event(group_id, type, param);
return nuraft::cb_func::Ok;
});

return nuraft::cmd_result_code::OK;
Expand Down Expand Up @@ -275,7 +269,7 @@ NullAsyncResult ManagerImpl::create_group(group_id_t const& group_id, std::strin
std::lock_guard< std::mutex > lg(_manager_lock);
_is_leader.insert(std::make_pair(group_id, false));
}
if (auto const err = _mesg_service->createRaftGroup(_srv_id, group_id, group_type_name); err) {
if (auto const err = _mesg_service->joinRaftGroup(_srv_id, group_id, group_type_name); err) {
return folly::makeUnexpected(err);
}

Expand Down Expand Up @@ -354,6 +348,7 @@ void ManagerImpl::get_srv_config_all(group_id_t const& group_id,

bool ManagerImpl::bind_data_service_request(std::string const& request_name, group_id_t const& group_id,
data_service_request_handler_t const& request_handler) {
RELEASE_ASSERT(_mesg_service, "Need to call ::start() first!");
return _mesg_service->bind_data_service_request(request_name, group_id, request_handler);
}

Expand All @@ -364,7 +359,9 @@ void mesg_state_mgr::make_repl_ctx(grpc_server* server, std::shared_ptr< mesg_fa
std::shared_ptr< Manager > init_messaging(Manager::Params const& p, std::weak_ptr< MessagingApplication > w,
bool with_data_svc) {
RELEASE_ASSERT(w.lock(), "Could not acquire application!");
return std::make_shared< ManagerImpl >(p, w, with_data_svc);
auto m = std::make_shared< ManagerImpl >(p, w);
m->start(with_data_svc);
return m;
}

} // namespace nuraft_mesg
14 changes: 8 additions & 6 deletions src/lib/manager_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <libnuraft/async.hxx>
#include <list>
#include <map>
#include <memory>
#include <mutex>
#include <string>

Expand All @@ -36,7 +37,7 @@ class group_factory;
class msg_service;
class group_metrics;

class ManagerImpl : public Manager {
class ManagerImpl : public Manager, public std::enable_shared_from_this< ManagerImpl > {
Manager::Params start_params_;
int32_t _srv_id;

Expand All @@ -56,20 +57,20 @@ class ManagerImpl : public Manager {
nuraft::ptr< nuraft::delayed_task_scheduler > _scheduler;
std::shared_ptr< sisl::logging::logger_t > _custom_logger;

nuraft::cmd_result_code group_init(int32_t const srv_id, group_id_t const& group_id, group_type_t const& group_type,
nuraft::context*& ctx, std::shared_ptr< group_metrics > metrics);
nuraft::cb_func::ReturnCode raft_event(group_id_t const& group_id, nuraft::cb_func::Type type,
nuraft::cb_func::Param* param);
void raft_event(group_id_t const& group_id, nuraft::cb_func::Type type, nuraft::cb_func::Param* param);
void exit_group(group_id_t const& group_id);

public:
ManagerImpl(Manager::Params const&, std::weak_ptr< MessagingApplication >, bool and_data_svc = false);
ManagerImpl(Manager::Params const&, std::weak_ptr< MessagingApplication >);
~ManagerImpl() override;

int32_t server_id() const override { return _srv_id; }

void register_mgr_type(group_type_t const& group_type, group_params const&) override;

nuraft::cmd_result_code group_init(int32_t const srv_id, group_id_t const& group_id, group_type_t const& group_type,
nuraft::context*& ctx, std::shared_ptr< group_metrics > metrics);

std::shared_ptr< mesg_state_mgr > lookup_state_manager(group_id_t const& group_id) const override;

NullAsyncResult create_group(group_id_t const& group_id, group_type_t const& group_type) override;
Expand All @@ -85,6 +86,7 @@ class ManagerImpl : public Manager {
void leave_group(group_id_t const& group_id) override;
uint32_t logstore_id(group_id_t const& group_id) const override;
void append_peers(group_id_t const& group_id, std::list< peer_id_t >&) const override;
void start(bool and_data_svc);
void restart_server() override;

// data service APIs
Expand Down
6 changes: 3 additions & 3 deletions src/lib/proto/proto_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,9 @@ bool proto_service::raftStep(const sisl::AsyncRpcDataPtr< Messaging, RaftGroupMs
return true;
}

std::shared_ptr< msg_service > msg_service::create(get_server_ctx_cb get_server_ctx, group_id_t const& service_address,
bool const enable_data_service) {
return std::make_shared< proto_service >(get_server_ctx, service_address, enable_data_service);
std::shared_ptr< msg_service > msg_service::create(std::shared_ptr< ManagerImpl > const& manager,
group_id_t const& service_address, bool const enable_data_service) {
return std::make_shared< proto_service >(manager, service_address, enable_data_service);
}

} // namespace nuraft_mesg
17 changes: 10 additions & 7 deletions src/lib/service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,9 @@ grpc_server_wrapper::grpc_server_wrapper(group_id_t const& group_id) {
if (0 < SISL_OPTIONS.count("msg_metrics")) m_metrics = std::make_shared< group_metrics >(group_id);
}

msg_service::msg_service(get_server_ctx_cb get_server_ctx, group_id_t const& service_address,
msg_service::msg_service(std::shared_ptr< ManagerImpl > const& manager, group_id_t const& service_address,
bool const enable_data_service) :
_data_service_enabled(enable_data_service),
_get_server_ctx(get_server_ctx),
_service_address(service_address) {}
_data_service_enabled(enable_data_service), _manager(manager), _service_address(service_address) {}

msg_service::~msg_service() {
std::unique_lock< lock_type > lck(_raft_servers_lock);
Expand Down Expand Up @@ -179,9 +177,14 @@ nuraft::cmd_result_code msg_service::joinRaftGroup(int32_t const srv_id, group_i
if (g_type.empty()) { g_type = _default_group_type; }
std::tie(it, happened) = _raft_servers.emplace(std::make_pair(group_id, group_id));
if (_raft_servers.end() != it && happened) {
if (auto err = _get_server_ctx(srv_id, group_id, g_type, ctx, it->second.m_metrics); err) {
LOGE("Error during RAFT server creation [group={}]: {}", group_id, err);
return err;
if (auto mgr = _manager.lock(); !mgr) {
LOGW("Got join after shutdown...skipping [group={}]", group_id);
return nuraft::cmd_result_code::CANCELLED;
} else {
if (auto err = mgr->group_init(srv_id, group_id, g_type, ctx, it->second.m_metrics); err) {
LOGE("Error during RAFT server creation [group={}]: {}", group_id, err);
return err;
}
}
DEBUG_ASSERT(!ctx->rpc_listener_, "RPC listner should not be set!");
auto new_listner = std::make_shared< msg_group_listner >(shared_from_this(), group_id);
Expand Down
18 changes: 5 additions & 13 deletions src/lib/service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@ class group_metrics : public sisl::MetricsGroupWrapper {
~group_metrics() { deregister_me_from_farm(); }
};

using get_server_ctx_cb =
std::function< nuraft::cmd_result_code(int32_t srv_id, group_id_t const&, group_type_t const&,
nuraft::context*& ctx_out, std::shared_ptr< group_metrics > metrics) >;

// pluggable type for data service
using data_service_t = data_service_grpc;

Expand All @@ -51,18 +47,19 @@ class msg_service : public std::enable_shared_from_this< msg_service >, public n
bool _data_service_enabled;
data_service_t _data_service;
std::string _default_group_type;
std::weak_ptr< ManagerImpl > _manager;

protected:
get_server_ctx_cb _get_server_ctx;
lock_type _raft_servers_lock;
std::map< group_id_t, grpc_server_wrapper > _raft_servers;
peer_id_t const _service_address;

public:
msg_service(get_server_ctx_cb get_server_ctx, peer_id_t const& service_address, bool const enable_data_service);
msg_service(std::shared_ptr< ManagerImpl > const& manager, peer_id_t const& service_address,
bool const enable_data_service);
virtual ~msg_service();
static std::shared_ptr< msg_service > create(get_server_ctx_cb get_server_ctx, peer_id_t const& service_address,
bool const enable_data_service);
static std::shared_ptr< msg_service > create(std::shared_ptr< ManagerImpl > const& manager,
peer_id_t const& service_address, bool const enable_data_service);

msg_service(msg_service const&) = delete;
msg_service& operator=(msg_service const&) = delete;
Expand All @@ -85,11 +82,6 @@ class msg_service : public std::enable_shared_from_this< msg_service >, public n

void setDefaultGroupType(std::string const& _type);

nuraft::cmd_result_code createRaftGroup(int const srv_id, group_id_t const& group_id,
group_type_t const& group_type) {
return joinRaftGroup(srv_id, group_id, group_type);
}

nuraft::cmd_result_code joinRaftGroup(int32_t srv_id, group_id_t const& group_id, group_type_t const&);

void partRaftGroup(group_id_t const& group_id);
Expand Down