Skip to content

Commit

Permalink
Fix race
Browse files Browse the repository at this point in the history
  • Loading branch information
szmyd committed Nov 10, 2023
1 parent 15c6fad commit 25748d6
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 44 deletions.
71 changes: 31 additions & 40 deletions src/lib/manager_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,14 @@ class null_service final : public grpc_server {
};

class msg_group_listner : public nuraft::rpc_listener {
std::shared_ptr< ManagerImpl > _mgr;
std::weak_ptr< ManagerImpl > _mgr;
group_id_t _group;

public:
msg_group_listner(std::shared_ptr< ManagerImpl > mgr, group_id_t const& group) : _mgr(mgr), _group(group) {}
~msg_group_listner() { _mgr->shutdown_for(_group); }
msg_group_listner(std::weak_ptr< ManagerImpl > mgr, group_id_t const& group) : _mgr(mgr), _group(group) {}
~msg_group_listner() {
if (auto mgr = _mgr.lock(); mgr) mgr->shutdown_for(_group);
}

void listen(nuraft::ptr< nuraft::msg_handler >&) override { LOGI("[group={}]", _group); }
void stop() override { LOGI("[group={}]", _group); }
Expand All @@ -86,10 +88,10 @@ class engine_factory : public group_factory {
public:
std::weak_ptr< MessagingApplication > application_;

engine_factory(int const threads, Manager::Params const& start_params, std::weak_ptr< MessagingApplication > app) :
group_factory::group_factory(threads, start_params.server_uuid_, start_params.token_client_,
start_params.ssl_cert_),
application_(app) {}
engine_factory(int const threads, group_id_t const& name,
std::shared_ptr< sisl::GrpcTokenClient > const token_client, std::string const& ssl_cert,
std::weak_ptr< MessagingApplication > app) :
group_factory::group_factory(threads, name, token_client, ssl_cert), application_(app) {}

std::string lookupEndpoint(peer_id_t const& client) override {
LOGT("[peer={}]", client);
Expand All @@ -99,8 +101,8 @@ class engine_factory : public group_factory {
};

ManagerImpl::~ManagerImpl() {
LOGI("[srv_id={}] shutdown started.", start_params_.server_uuid_);
if (_grpc_server) { _grpc_server->shutdown(); }
LOGI("MessagingService shutdown started.");
std::deque< std::shared_ptr< grpc_server > > servers;
{
std::unique_lock< lock_type > lck(_raft_servers_lock);
Expand All @@ -113,12 +115,7 @@ ManagerImpl::~ManagerImpl() {
server->raft_server()->stop_server();
server->raft_server()->shutdown();
}

{
std::unique_lock< lock_type > lck(_raft_servers_lock);
_raft_servers_sync.wait(lck, [this]() { return _raft_servers.empty(); });
}
LOGI("MessagingService shutdown complete.");
LOGI("[srv_id={}] shutdown complete.", start_params_.server_uuid_);
}

void ManagerImpl::shutdown_for(group_id_t const& group_id) {
Expand All @@ -132,7 +129,6 @@ void ManagerImpl::shutdown_for(group_id_t const& group_id) {
return;
}
}
_raft_servers_sync.notify_all();
}

ManagerImpl::ManagerImpl(Manager::Params const& start_params, std::weak_ptr< MessagingApplication > app,
Expand All @@ -141,7 +137,8 @@ ManagerImpl::ManagerImpl(Manager::Params const& start_params, std::weak_ptr< Mes
_srv_id(to_server_id(start_params_.server_uuid_)),
application_(app),
_data_service_enabled(and_data_svc) {
_g_factory = std::make_shared< engine_factory >(grpc_client_threads, start_params_, app);
_g_factory = std::make_shared< engine_factory >(grpc_client_threads, start_params.server_uuid_,
start_params.token_client_, start_params.ssl_cert_, app);
auto logger_name = fmt::format("nuraft_{}", start_params_.server_uuid_);
//
// NOTE: The Unit tests require this instance to be recreated with the same parameters.
Expand Down Expand Up @@ -274,36 +271,37 @@ void ManagerImpl::register_mgr_type(group_type_t const& group_type, group_params
if (_state_mgr_types.end() == it) { LOGE("Could not register [group_type={}]", group_type); }
}

nuraft::cb_func::ReturnCode ManagerImpl::callback_handler(group_id_t const& group_id, nuraft::cb_func::Type type,
nuraft::cb_func::Param* param) {
nuraft::cb_func::ReturnCode ManagerImpl::raft_event(group_id_t const& group_id, nuraft::cb_func::Type type,
nuraft::cb_func::Param* param) {
switch (type) {
case nuraft::cb_func::RemovedFromCluster: {
LOGI("Removed from cluster [group={}]", group_id);
LOGI("[srv_id={}] evicted from: [group={}]", start_params_.server_uuid_, group_id);
exit_group(group_id);
} break;
case nuraft::cb_func::JoinedCluster: {
auto const my_id = param->myId;
auto const leader_id = param->leaderId;
LOGI("Joined cluster: [group={}], [l_id:{},my_id:{}]", group_id, leader_id, my_id);
LOGI("[srv_id={}] joined: [group={}], [leader_id:{},my_id:{}]", start_params_.server_uuid_, group_id, leader_id,
my_id);
{
std::lock_guard< std::mutex > lg(_manager_lock);
_is_leader[group_id] = (leader_id == my_id);
}
} break;
case nuraft::cb_func::NewConfig: {
LOGD("Cluster change for: [group={}]", group_id);
LOGD("[srv_id={}] saw cluster change: [group={}]", start_params_.server_uuid_, group_id);
_config_change.notify_all();
} break;
case nuraft::cb_func::BecomeLeader: {
LOGD("I'm the leader of: [group={}]!", group_id);
LOGI("[srv_id={}] became leader: [group={}]!", start_params_.server_uuid_, group_id);
{
std::lock_guard< std::mutex > lg(_manager_lock);
_is_leader[group_id] = true;
}
_config_change.notify_all();
} break;
case nuraft::cb_func::BecomeFollower: {
LOGI("I'm a follower of: [group={}]!", group_id);
LOGI("[srv_id={}] following: [group={}]!", start_params_.server_uuid_, group_id);
{
std::lock_guard< std::mutex > lg(_manager_lock);
_is_leader[group_id] = false;
Expand Down Expand Up @@ -366,7 +364,7 @@ nuraft::cmd_result_code ManagerImpl::group_init(int32_t const srv_id, group_id_t
nuraft::ptr< nuraft::logger > logger = std::make_shared< nuraft_mesg_logger >(group_id, _custom_logger);
ctx = new nuraft::context(smgr, sm, listener, logger, rpc_cli_factory, _scheduler, params);
ctx->set_cb_func([this, group_id](nuraft::cb_func::Type type, nuraft::cb_func::Param* param) mutable {
return this->callback_handler(group_id, type, param);
return this->raft_event(group_id, type, param);
});

return nuraft::cmd_result_code::OK;
Expand Down Expand Up @@ -460,7 +458,7 @@ std::shared_ptr< mesg_state_mgr > ManagerImpl::lookup_state_manager(group_id_t c

nuraft::cmd_result_code ManagerImpl::joinRaftGroup(int32_t const srv_id, group_id_t const& group_id,
group_type_t const& group_type) {
LOGI("Joining RAFT [group={}], type: {}", group_id, group_type);
LOGI("[srv_id={}] joining [group={}, type={}]", start_params_.server_uuid_, group_id, group_type);

nuraft::context* ctx{nullptr};
bool happened{false};
Expand Down Expand Up @@ -540,38 +538,31 @@ void ManagerImpl::append_peers(group_id_t const& group_id, std::list< peer_id_t

void ManagerImpl::leave_group(group_id_t const& group_id) {
LOGI("Leaving group [group={}]", group_id);
{
std::lock_guard< std::mutex > lg(_manager_lock);
if (0 == _state_managers.count(group_id)) {
LOGD("Asked to leave [group={}] which we are not part of!", group_id);
return;
}
}

std::shared_ptr< grpc_server > server;

{
std::unique_lock< lock_type > lck(_raft_servers_lock);
if (auto it = _raft_servers.find(group_id); _raft_servers.end() != it) {
server = it->second.m_server;
} else {
LOGW("Unknown [group={}] cannot part.", group_id);
return;
}
}

if (auto raft_server = server->raft_server(); raft_server) {
LOGI("[group={}]", group_id);
raft_server->stop_server();
raft_server->shutdown();
if (server) {
if (auto raft_server = server->raft_server(); raft_server) {
LOGI("[group={}]", group_id);
raft_server->stop_server();
raft_server->shutdown();
}
}

std::lock_guard< std::mutex > lg(_manager_lock);
if (auto it = _state_managers.find(group_id); _state_managers.end() != it) {
// Delete all the state files (RAFT log etc.) after descrtuctor is called.
it->second->permanent_destroy();
_state_managers.erase(it);
}
} else
LOGW("Unknown [state_mgr={}] cannot destroy.", group_id);

LOGI("Finished leaving: [group={}]", group_id);
}
Expand Down
5 changes: 2 additions & 3 deletions src/lib/manager_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ class ManagerImpl : public Manager, public std::enable_shared_from_this< Manager
std::shared_ptr< group_factory > _g_factory;

std::mutex _raft_sync_lock;
std::condition_variable_any _raft_servers_sync;
folly::SharedMutex _raft_servers_lock;
std::map< group_id_t, grpc_server_wrapper > _raft_servers;

Expand All @@ -74,8 +73,8 @@ class ManagerImpl : public Manager, public std::enable_shared_from_this< Manager

nuraft::cmd_result_code group_init(int32_t const srv_id, group_id_t const& group_id, group_type_t const& group_type,
nuraft::context*& ctx, std::shared_ptr< group_metrics > metrics);
nuraft::cb_func::ReturnCode callback_handler(group_id_t const& group_id, nuraft::cb_func::Type type,
nuraft::cb_func::Param* param);
nuraft::cb_func::ReturnCode raft_event(group_id_t const& group_id, nuraft::cb_func::Type type,
nuraft::cb_func::Param* param);
void exit_group(group_id_t const& group_id);

nuraft::cmd_result_code joinRaftGroup(int32_t srv_id, group_id_t const& group_id, group_type_t const&);
Expand Down
3 changes: 2 additions & 1 deletion src/tests/test_state_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,8 @@ void test_state_mgr::fill_data_vec(nuraft_mesg::io_blob_list_t& cli_buf) {
uint16_t test_state_mgr::get_random_num() {
static std::random_device dev;
static std::mt19937 rng(dev());
std::uniform_int_distribution< std::mt19937::result_type > dist(1001u, 65535u);
// start @ 1024 for non-privelged ports
std::uniform_int_distribution< std::mt19937::result_type > dist(1024u, UINT16_MAX);
return dist(rng);
}

Expand Down

0 comments on commit 25748d6

Please sign in to comment.