From e1d7f4edea175aaa7651e6f2542ca9901ba4340f Mon Sep 17 00:00:00 2001 From: Jung-Sang Ahn Date: Tue, 13 Feb 2024 00:16:53 -0800 Subject: [PATCH] Add an API to schedule snapshot creation * Added `schedule_snapshot_creation()` API to manually create a snapshot. * Unlike `create_snapshot()`, if snapshot creation is already in progress, it will wait and create another snapshot on the next available log index number. --- README.md | 3 +- include/libnuraft/raft_server.hxx | 30 ++++++++- src/handle_commit.cxx | 42 +++++++++++- src/raft_server.cxx | 16 ++++- tests/unit/raft_server_test.cxx | 107 ++++++++++++++++++++++++++++++ 5 files changed, 191 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index f27cfe8c..1f1f5d27 100644 --- a/README.md +++ b/README.md @@ -135,8 +135,7 @@ We welcome contributions. If you find any bugs, potential flaws and edge cases, Contact ------- -* Jung-Sang Ahn -* Gene Zhang +* Jung-Sang Ahn License Information diff --git a/include/libnuraft/raft_server.hxx b/include/libnuraft/raft_server.hxx index 6862639c..70faf49d 100644 --- a/include/libnuraft/raft_server.hxx +++ b/include/libnuraft/raft_server.hxx @@ -797,10 +797,27 @@ public: * Manually create a snapshot based on the latest committed * log index of the state machine. * + * Note that snapshot creation will fail immediately if the previous + * snapshot task is still running. + * * @return Log index number of the created snapshot or`0` if failed. */ ulong create_snapshot(); + /** + * Manually and asynchronously create a snapshot on the next earliest + * available commited log index. + * + * Unlike `create_snapshot`, if the previous snapshot task is running, + * it will wait until the previous task is done. Once the snapshot + * creation is finished, it will be notified via the returned + * `cmd_result` with the log index number of the snapshot. + * + * @return `cmd_result` instance. + * `nullptr` if there is already a scheduled snapshot creation. + */ + ptr< cmd_result > schedule_snapshot_creation(); + /** * Get the log index number of the last snapshot. * @@ -954,7 +971,8 @@ protected: void invite_srv_to_join_cluster(); void rm_srv_from_cluster(int32 srv_id); int get_snapshot_sync_block_size() const; - void on_snapshot_completed(ptr& s, + void on_snapshot_completed(ptr s, + ptr> manual_creation_cb, bool result, ptr& err); void on_log_compacted(ulong log_idx, @@ -1247,6 +1265,16 @@ protected: */ std::atomic snp_in_progress_; + /** + * `true` if a manual snapshot creation is scheduled by the user. + */ + std::atomic snp_creation_scheduled_; + + /** + * Non-null if a manual snapshot creation is cheduled by the user. + */ + ptr< cmd_result > sched_snp_creation_result_; + /** * (Read-only, but its contents will change) * Server context. diff --git a/src/handle_commit.cxx b/src/handle_commit.cxx index bec18914..9fefc15e 100644 --- a/src/handle_commit.cxx +++ b/src/handle_commit.cxx @@ -469,6 +469,18 @@ ulong raft_server::create_snapshot() { return snapshot_and_compact(committed_idx, true) ? committed_idx : 0; } +ptr< cmd_result > raft_server::schedule_snapshot_creation() { + bool exp = false; + if (!snp_creation_scheduled_.compare_exchange_strong(exp, true)) { + p_wn("snapshot creation is already scheduled"); + return nilptr; + } + + sched_snp_creation_result_ = cs_new>(); + p_in("schedule snapshot creation"); + return sched_snp_creation_result_; +} + ulong raft_server::get_last_snapshot_idx() const { std::lock_guard l(last_snapshot_lock_); return last_snapshot_ ? last_snapshot_->get_last_log_idx(): 0; @@ -494,7 +506,7 @@ bool raft_server::snapshot_and_compact(ulong committed_idx, bool forced_creation snapshot_distance = first_snapshot_distance_; } - if (!forced_creation) { + if (!forced_creation && !snp_creation_scheduled_) { // If `forced_creation == true`, ignore below conditions. if ( params->snapshot_distance_ == 0 || ( committed_idx - log_store_->start_index() + 1 ) < snapshot_distance ) { @@ -522,6 +534,7 @@ bool raft_server::snapshot_and_compact(ulong committed_idx, bool forced_creation } if ( ( forced_creation || + snp_creation_scheduled_ || !local_snp || committed_idx >= snapshot_distance + local_snp->get_last_log_idx() ) && snp_in_progress_.compare_exchange_strong(f, true) ) @@ -546,6 +559,14 @@ bool raft_server::snapshot_and_compact(ulong committed_idx, bool forced_creation return false; } + ptr> manual_creation_cb = nullptr; + if (snp_creation_scheduled_) { + // User scheduled a new snapshot creation. + // Due to `snp_in_progress_` it will happen only once. + manual_creation_cb = sched_snp_creation_result_; + p_in("snapshot creation is scheduled by user"); + } + while ( conf->get_log_idx() > committed_idx && conf->get_prev_log_idx() >= log_store_->start_index() ) { ptr conf_log @@ -593,6 +614,7 @@ bool raft_server::snapshot_and_compact(ulong committed_idx, bool forced_creation std::bind( &raft_server::on_snapshot_completed, this, new_snapshot, + manual_creation_cb, std::placeholders::_1, std::placeholders::_2 ); timer_helper tt; @@ -618,7 +640,10 @@ bool raft_server::snapshot_and_compact(ulong committed_idx, bool forced_creation } void raft_server::on_snapshot_completed - ( ptr& s, bool result, ptr& err ) + ( ptr s, + ptr> manual_creation_cb, + bool result, + ptr& err ) { do { // Dummy loop if (err != nilptr) { @@ -660,6 +685,19 @@ void raft_server::on_snapshot_completed } } while (false); + if (manual_creation_cb.get()) { + // This was a manual request scheduled by the user. + uint64_t idx = 0; + cmd_result_code code = cmd_result_code::FAILED; + if (err == nilptr && result) { + idx = s->get_last_log_idx(); + code = cmd_result_code::OK; + } + manual_creation_cb->set_result(idx, err, code); + sched_snp_creation_result_.reset(); + snp_creation_scheduled_ = false; + } + snp_in_progress_.store(false); } diff --git a/src/raft_server.cxx b/src/raft_server.cxx index d3b2d9a2..56539315 100644 --- a/src/raft_server.cxx +++ b/src/raft_server.cxx @@ -78,6 +78,8 @@ raft_server::raft_server(context* ctx, const init_options& opt) , serving_req_(false) , steps_to_down_(0) , snp_in_progress_(false) + , snp_creation_scheduled_(false) + , sched_snp_creation_result_(nullptr) , ctx_(ctx) , scheduler_(ctx->scheduler_) , election_exec_(std::bind(&raft_server::handle_election_timeout, this)) @@ -1120,14 +1122,17 @@ void raft_server::check_leadership_transfer() { ptr params = ctx_->get_params(); if (!params->leadership_transfer_min_wait_time_) { // Transferring leadership is disabled. + p_tr("leadership transfer is disabled"); return; } if (!leadership_transfer_timer_.timeout()) { // Leadership period is too short. + p_tr("leadership period is too short: %zu ms", + leadership_transfer_timer_.get_duration_us() / 1000); return; } - size_t hb_interval_ms = ctx_->get_params()->heart_beat_interval_; + size_t election_lower = ctx_->get_params()->election_timeout_lower_bound_; recur_lock(lock_); @@ -1146,24 +1151,31 @@ void raft_server::check_leadership_transfer() { if (peer_elem->get_matched_idx() + params->stale_log_gap_ < cur_commit_idx) { // This peer is lagging behind. + p_tr("peer %d is lagging behind, %lu < %lu", + s_conf.get_id(), peer_elem->get_matched_idx(), + cur_commit_idx); return; } uint64_t last_resp_ms = peer_elem->get_resp_timer_us() / 1000; - if (last_resp_ms > hb_interval_ms) { + if (last_resp_ms > election_lower) { // This replica is not responding. + p_tr("peer %d is not responding, %lu ms ago", + s_conf.get_id(), last_resp_ms); return; } } if (my_priority_ >= max_priority || successor_id == -1) { // This leader already has the highest priority. + p_tr("my priority %d is already the highest", my_priority_); return; } if (!state_machine_->allow_leadership_transfer()) { // Although all conditions are met, // user does not want to transfer the leadership. + p_tr("state machine does not allow leadership transfer"); return; } diff --git a/tests/unit/raft_server_test.cxx b/tests/unit/raft_server_test.cxx index c4f19026..3dcbafe4 100644 --- a/tests/unit/raft_server_test.cxx +++ b/tests/unit/raft_server_test.cxx @@ -2238,6 +2238,110 @@ int snapshot_creation_index_inversion_test() { return 0; } +int snapshot_scheduled_creation_test() { + reset_log_files(); + ptr f_base = cs_new(); + + std::string s1_addr = "S1"; + std::string s2_addr = "S2"; + std::string s3_addr = "S3"; + + RaftPkg s1(f_base, 1, s1_addr); + RaftPkg s2(f_base, 2, s2_addr); + RaftPkg s3(f_base, 3, s3_addr); + std::vector pkgs = {&s1, &s2, &s3}; + + CHK_Z( launch_servers( pkgs ) ); + CHK_Z( make_group( pkgs ) ); + + // Append a message using separate thread. + ExecArgs exec_args(&s1); + TestSuite::ThreadHolder hh(&exec_args, fake_executer, fake_executer_killer); + + for (auto& entry: pkgs) { + RaftPkg* pp = entry; + raft_params param = pp->raftServer->get_current_params(); + param.return_method_ = raft_params::async_handler; + pp->raftServer->update_params(param); + } + + const size_t NUM = 5; + + // Append messages asynchronously. + std::list< ptr< cmd_result< ptr > > > handlers; + for (size_t ii = 0; ii < NUM; ++ii) { + std::string test_msg = "test" + std::to_string(ii); + ptr msg = buffer::alloc(test_msg.size() + 1); + msg->put(test_msg); + ptr< cmd_result< ptr > > ret = + s1.raftServer->append_entries( {msg} ); + + CHK_TRUE( ret->get_accepted() ); + + handlers.push_back(ret); + } + + s1.fNet->execReqResp(); // replication. + s1.fNet->execReqResp(); // commit. + CHK_Z( wait_for_sm_exec(pkgs, COMMIT_TIMEOUT_SEC) ); // commit execution. + + // One more time to make sure. + s1.fNet->execReqResp(); + s1.fNet->execReqResp(); + CHK_Z( wait_for_sm_exec(pkgs, COMMIT_TIMEOUT_SEC) ); + + // Manually create a snapshot. + uint64_t log_idx = s1.raftServer->create_snapshot(); + CHK_GT(log_idx, 0); + + // Schedule snapshot creation and wait 500ms, there shouldn't be any progress. + auto sched_ret = s1.raftServer->schedule_snapshot_creation(); + TestSuite::sleep_ms(500, "wait for async snapshot creation"); + CHK_FALSE(sched_ret->has_result()); + + uint64_t last_idx = s1.raftServer->get_last_log_idx(); + + // Append more messages asynchronously. + for (size_t ii = NUM; ii < NUM * 2; ++ii) { + std::string test_msg = "test" + std::to_string(ii); + ptr msg = buffer::alloc(test_msg.size() + 1); + msg->put(test_msg); + ptr< cmd_result< ptr > > ret = + s1.raftServer->append_entries( {msg} ); + + CHK_TRUE( ret->get_accepted() ); + + handlers.push_back(ret); + } + + s1.fNet->execReqResp(); // replication. + s1.fNet->execReqResp(); // commit. + CHK_Z( wait_for_sm_exec(pkgs, COMMIT_TIMEOUT_SEC) ); // commit execution. + + // One more time to make sure. + s1.fNet->execReqResp(); + s1.fNet->execReqResp(); + CHK_Z( wait_for_sm_exec(pkgs, COMMIT_TIMEOUT_SEC) ); + + // Now it should have the result. + CHK_TRUE(sched_ret->has_result()); + CHK_EQ(last_idx + 1, sched_ret->get()); + + print_stats(pkgs); + + s1.raftServer->shutdown(); + s2.raftServer->shutdown(); + s3.raftServer->shutdown(); + + fake_executer_killer(&exec_args); + hh.join(); + CHK_Z( hh.getResult() ); + + f_base->destroy(); + + return 0; +} + int snapshot_randomized_creation_test() { reset_log_files(); ptr f_base = cs_new(); @@ -3387,6 +3491,9 @@ int main(int argc, char** argv) { ts.doTest( "snapshot creation index inversion test", snapshot_creation_index_inversion_test ); + ts.doTest( "snapshot scheduled creation test", + snapshot_scheduled_creation_test ); + ts.doTest( "snapshot randomized creation test", snapshot_randomized_creation_test );