Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an API to schedule snapshot creation #483

Merged
merged 1 commit into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,7 @@ We welcome contributions. If you find any bugs, potential flaws and edge cases,

Contact
-------
* Jung-Sang Ahn <[email protected]>
* Gene Zhang <[email protected]>
* Jung-Sang Ahn <[email protected]>


License Information
Expand Down
30 changes: 29 additions & 1 deletion include/libnuraft/raft_server.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -797,10 +797,27 @@ public:
* Manually create a snapshot based on the latest committed
* log index of the state machine.
*
* Note that snapshot creation will fail immediately if the previous
* snapshot task is still running.
*
* @return Log index number of the created snapshot or`0` if failed.
*/
ulong create_snapshot();

/**
* Manually and asynchronously create a snapshot on the next earliest
* available commited log index.
*
* Unlike `create_snapshot`, if the previous snapshot task is running,
* it will wait until the previous task is done. Once the snapshot
* creation is finished, it will be notified via the returned
* `cmd_result` with the log index number of the snapshot.
*
* @return `cmd_result` instance.
* `nullptr` if there is already a scheduled snapshot creation.
*/
ptr< cmd_result<uint64_t> > schedule_snapshot_creation();

/**
* Get the log index number of the last snapshot.
*
Expand Down Expand Up @@ -954,7 +971,8 @@ protected:
void invite_srv_to_join_cluster();
void rm_srv_from_cluster(int32 srv_id);
int get_snapshot_sync_block_size() const;
void on_snapshot_completed(ptr<snapshot>& s,
void on_snapshot_completed(ptr<snapshot> s,
ptr<cmd_result<uint64_t>> manual_creation_cb,
bool result,
ptr<std::exception>& err);
void on_log_compacted(ulong log_idx,
Expand Down Expand Up @@ -1247,6 +1265,16 @@ protected:
*/
std::atomic<bool> snp_in_progress_;

/**
* `true` if a manual snapshot creation is scheduled by the user.
*/
std::atomic<bool> snp_creation_scheduled_;

/**
* Non-null if a manual snapshot creation is cheduled by the user.
*/
ptr< cmd_result<uint64_t> > sched_snp_creation_result_;

/**
* (Read-only, but its contents will change)
* Server context.
Expand Down
42 changes: 40 additions & 2 deletions src/handle_commit.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,18 @@ ulong raft_server::create_snapshot() {
return snapshot_and_compact(committed_idx, true) ? committed_idx : 0;
}

ptr< cmd_result<uint64_t> > raft_server::schedule_snapshot_creation() {
bool exp = false;
if (!snp_creation_scheduled_.compare_exchange_strong(exp, true)) {
p_wn("snapshot creation is already scheduled");
return nilptr;
}

sched_snp_creation_result_ = cs_new<cmd_result<uint64_t>>();
p_in("schedule snapshot creation");
return sched_snp_creation_result_;
}

ulong raft_server::get_last_snapshot_idx() const {
std::lock_guard<std::mutex> l(last_snapshot_lock_);
return last_snapshot_ ? last_snapshot_->get_last_log_idx(): 0;
Expand All @@ -494,7 +506,7 @@ bool raft_server::snapshot_and_compact(ulong committed_idx, bool forced_creation
snapshot_distance = first_snapshot_distance_;
}

if (!forced_creation) {
if (!forced_creation && !snp_creation_scheduled_) {
// If `forced_creation == true`, ignore below conditions.
if ( params->snapshot_distance_ == 0 ||
( committed_idx - log_store_->start_index() + 1 ) < snapshot_distance ) {
Expand Down Expand Up @@ -522,6 +534,7 @@ bool raft_server::snapshot_and_compact(ulong committed_idx, bool forced_creation
}

if ( ( forced_creation ||
snp_creation_scheduled_ ||
!local_snp ||
committed_idx >= snapshot_distance + local_snp->get_last_log_idx() ) &&
snp_in_progress_.compare_exchange_strong(f, true) )
Expand All @@ -546,6 +559,14 @@ bool raft_server::snapshot_and_compact(ulong committed_idx, bool forced_creation
return false;
}

ptr<cmd_result<uint64_t>> manual_creation_cb = nullptr;
if (snp_creation_scheduled_) {
// User scheduled a new snapshot creation.
// Due to `snp_in_progress_` it will happen only once.
manual_creation_cb = sched_snp_creation_result_;
p_in("snapshot creation is scheduled by user");
}

while ( conf->get_log_idx() > committed_idx &&
conf->get_prev_log_idx() >= log_store_->start_index() ) {
ptr<log_entry> conf_log
Expand Down Expand Up @@ -593,6 +614,7 @@ bool raft_server::snapshot_and_compact(ulong committed_idx, bool forced_creation
std::bind( &raft_server::on_snapshot_completed,
this,
new_snapshot,
manual_creation_cb,
std::placeholders::_1,
std::placeholders::_2 );
timer_helper tt;
Expand All @@ -618,7 +640,10 @@ bool raft_server::snapshot_and_compact(ulong committed_idx, bool forced_creation
}

void raft_server::on_snapshot_completed
( ptr<snapshot>& s, bool result, ptr<std::exception>& err )
( ptr<snapshot> s,
ptr<cmd_result<uint64_t>> manual_creation_cb,
bool result,
ptr<std::exception>& err )
{
do { // Dummy loop
if (err != nilptr) {
Expand Down Expand Up @@ -660,6 +685,19 @@ void raft_server::on_snapshot_completed
}
} while (false);

if (manual_creation_cb.get()) {
// This was a manual request scheduled by the user.
uint64_t idx = 0;
cmd_result_code code = cmd_result_code::FAILED;
if (err == nilptr && result) {
idx = s->get_last_log_idx();
code = cmd_result_code::OK;
}
manual_creation_cb->set_result(idx, err, code);
sched_snp_creation_result_.reset();
snp_creation_scheduled_ = false;
}

snp_in_progress_.store(false);
}

Expand Down
16 changes: 14 additions & 2 deletions src/raft_server.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ raft_server::raft_server(context* ctx, const init_options& opt)
, serving_req_(false)
, steps_to_down_(0)
, snp_in_progress_(false)
, snp_creation_scheduled_(false)
, sched_snp_creation_result_(nullptr)
, ctx_(ctx)
, scheduler_(ctx->scheduler_)
, election_exec_(std::bind(&raft_server::handle_election_timeout, this))
Expand Down Expand Up @@ -1120,14 +1122,17 @@ void raft_server::check_leadership_transfer() {
ptr<raft_params> params = ctx_->get_params();
if (!params->leadership_transfer_min_wait_time_) {
// Transferring leadership is disabled.
p_tr("leadership transfer is disabled");
return;
}
if (!leadership_transfer_timer_.timeout()) {
// Leadership period is too short.
p_tr("leadership period is too short: %zu ms",
leadership_transfer_timer_.get_duration_us() / 1000);
return;
}

size_t hb_interval_ms = ctx_->get_params()->heart_beat_interval_;
size_t election_lower = ctx_->get_params()->election_timeout_lower_bound_;

recur_lock(lock_);

Expand All @@ -1146,24 +1151,31 @@ void raft_server::check_leadership_transfer() {
if (peer_elem->get_matched_idx() + params->stale_log_gap_ <
cur_commit_idx) {
// This peer is lagging behind.
p_tr("peer %d is lagging behind, %lu < %lu",
s_conf.get_id(), peer_elem->get_matched_idx(),
cur_commit_idx);
return;
}

uint64_t last_resp_ms = peer_elem->get_resp_timer_us() / 1000;
if (last_resp_ms > hb_interval_ms) {
if (last_resp_ms > election_lower) {
// This replica is not responding.
p_tr("peer %d is not responding, %lu ms ago",
s_conf.get_id(), last_resp_ms);
return;
}
}

if (my_priority_ >= max_priority || successor_id == -1) {
// This leader already has the highest priority.
p_tr("my priority %d is already the highest", my_priority_);
return;
}

if (!state_machine_->allow_leadership_transfer()) {
// Although all conditions are met,
// user does not want to transfer the leadership.
p_tr("state machine does not allow leadership transfer");
return;
}

Expand Down
107 changes: 107 additions & 0 deletions tests/unit/raft_server_test.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -2238,6 +2238,110 @@ int snapshot_creation_index_inversion_test() {
return 0;
}

int snapshot_scheduled_creation_test() {
reset_log_files();
ptr<FakeNetworkBase> f_base = cs_new<FakeNetworkBase>();

std::string s1_addr = "S1";
std::string s2_addr = "S2";
std::string s3_addr = "S3";

RaftPkg s1(f_base, 1, s1_addr);
RaftPkg s2(f_base, 2, s2_addr);
RaftPkg s3(f_base, 3, s3_addr);
std::vector<RaftPkg*> pkgs = {&s1, &s2, &s3};

CHK_Z( launch_servers( pkgs ) );
CHK_Z( make_group( pkgs ) );

// Append a message using separate thread.
ExecArgs exec_args(&s1);
TestSuite::ThreadHolder hh(&exec_args, fake_executer, fake_executer_killer);

for (auto& entry: pkgs) {
RaftPkg* pp = entry;
raft_params param = pp->raftServer->get_current_params();
param.return_method_ = raft_params::async_handler;
pp->raftServer->update_params(param);
}

const size_t NUM = 5;

// Append messages asynchronously.
std::list< ptr< cmd_result< ptr<buffer> > > > handlers;
for (size_t ii = 0; ii < NUM; ++ii) {
std::string test_msg = "test" + std::to_string(ii);
ptr<buffer> msg = buffer::alloc(test_msg.size() + 1);
msg->put(test_msg);
ptr< cmd_result< ptr<buffer> > > ret =
s1.raftServer->append_entries( {msg} );

CHK_TRUE( ret->get_accepted() );

handlers.push_back(ret);
}

s1.fNet->execReqResp(); // replication.
s1.fNet->execReqResp(); // commit.
CHK_Z( wait_for_sm_exec(pkgs, COMMIT_TIMEOUT_SEC) ); // commit execution.

// One more time to make sure.
s1.fNet->execReqResp();
s1.fNet->execReqResp();
CHK_Z( wait_for_sm_exec(pkgs, COMMIT_TIMEOUT_SEC) );

// Manually create a snapshot.
uint64_t log_idx = s1.raftServer->create_snapshot();
CHK_GT(log_idx, 0);

// Schedule snapshot creation and wait 500ms, there shouldn't be any progress.
auto sched_ret = s1.raftServer->schedule_snapshot_creation();
TestSuite::sleep_ms(500, "wait for async snapshot creation");
CHK_FALSE(sched_ret->has_result());

uint64_t last_idx = s1.raftServer->get_last_log_idx();

// Append more messages asynchronously.
for (size_t ii = NUM; ii < NUM * 2; ++ii) {
std::string test_msg = "test" + std::to_string(ii);
ptr<buffer> msg = buffer::alloc(test_msg.size() + 1);
msg->put(test_msg);
ptr< cmd_result< ptr<buffer> > > ret =
s1.raftServer->append_entries( {msg} );

CHK_TRUE( ret->get_accepted() );

handlers.push_back(ret);
}

s1.fNet->execReqResp(); // replication.
s1.fNet->execReqResp(); // commit.
CHK_Z( wait_for_sm_exec(pkgs, COMMIT_TIMEOUT_SEC) ); // commit execution.

// One more time to make sure.
s1.fNet->execReqResp();
s1.fNet->execReqResp();
CHK_Z( wait_for_sm_exec(pkgs, COMMIT_TIMEOUT_SEC) );

// Now it should have the result.
CHK_TRUE(sched_ret->has_result());
CHK_EQ(last_idx + 1, sched_ret->get());

print_stats(pkgs);

s1.raftServer->shutdown();
s2.raftServer->shutdown();
s3.raftServer->shutdown();

fake_executer_killer(&exec_args);
hh.join();
CHK_Z( hh.getResult() );

f_base->destroy();

return 0;
}

int snapshot_randomized_creation_test() {
reset_log_files();
ptr<FakeNetworkBase> f_base = cs_new<FakeNetworkBase>();
Expand Down Expand Up @@ -3387,6 +3491,9 @@ int main(int argc, char** argv) {
ts.doTest( "snapshot creation index inversion test",
snapshot_creation_index_inversion_test );

ts.doTest( "snapshot scheduled creation test",
snapshot_scheduled_creation_test );

ts.doTest( "snapshot randomized creation test",
snapshot_randomized_creation_test );

Expand Down
Loading