Skip to content

Commit

Permalink
Add additional tests for replace member (#574)
Browse files Browse the repository at this point in the history
  • Loading branch information
sanebay authored Nov 1, 2024
1 parent 60eea4a commit c4efe11
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 20 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.5.4"
version = "6.5.5"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
10 changes: 10 additions & 0 deletions src/lib/replication/repl_dev/raft_state_machine.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <iomgr/iomgr_timer.hpp>
#include <iomgr/iomgr_flip.hpp>
#include <sisl/logging/logging.h>
#include <sisl/fds/utils.hpp>
#include <sisl/fds/vector_pool.hpp>
Expand All @@ -9,6 +10,7 @@
#include "repl_dev/raft_repl_dev.h"
#include <homestore/homestore.hpp>
#include "common/homestore_config.hpp"
#include "common/crash_simulator.hpp"

SISL_LOGGING_DECL(replication)

Expand Down Expand Up @@ -293,6 +295,14 @@ void RaftStateMachine::save_logical_snp_obj(nuraft::snapshot& s, ulong& obj_id,

// Update the object offset.
obj_id = snp_data->offset;

#ifdef _PRERELEASE
if (iomgr_flip::instance()->test_flip("baseline_resync_restart_new_follower")) {
LOGINFO("Hit flip baseline_resync_restart_new_follower crashing");
hs()->crash_simulator().crash();
return;
}
#endif
}

bool RaftStateMachine::apply_snapshot(nuraft::snapshot& s) {
Expand Down
9 changes: 9 additions & 0 deletions src/tests/test_common/hs_repl_test_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,10 @@ class HSReplTestHelper : public HSTestHelper {
start_homestore();
}

void reinit_repl_app() {
m_token.params(HS_SERVICE::REPLICATION).repl_app = std::make_unique< TestReplApplication >(*this);
}

uint16_t replica_num() const { return replica_num_; }
homestore::replica_id_t my_replica_id() const { return my_replica_id_; }
homestore::replica_id_t replica_id(uint16_t member_id) const {
Expand Down Expand Up @@ -317,6 +321,11 @@ class HSReplTestHelper : public HSTestHelper {
}
}

void add_listener(std::shared_ptr< ReplDevListener > listener) {
std::unique_lock lg(groups_mtx_);
pending_listeners_.emplace_back(listener);
}

size_t num_listeners() const {
std::unique_lock lg(groups_mtx_);
return repl_groups_.size();
Expand Down
24 changes: 19 additions & 5 deletions src/tests/test_common/raft_repl_test_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ class TestReplicatedDB : public homestore::ReplDevListener {
kv_snapshot_data.emplace_back(Key{v.id_}, v);
LOGTRACEMOD(replication, "[Replica={}] Read logical snapshot callback fetching lsn={} size={} pattern={}",
g_helper->replica_num(), v.lsn_, v.data_size_, v.data_pattern_);
if (kv_snapshot_data.size() >= 1000) { break; }
if (kv_snapshot_data.size() >= 10) { break; }
}

if (kv_snapshot_data.size() == 0) {
Expand Down Expand Up @@ -430,6 +430,7 @@ class RaftReplDevTestBase : public testing::Test {
for (auto const& db : dbs_) {
if (db->is_zombie()) { continue; }
auto repl_dev = std::dynamic_pointer_cast< RaftReplDev >(db->repl_dev());
if (!repl_dev) continue;
int i = 0;
bool force_leave = false;
do {
Expand Down Expand Up @@ -511,6 +512,11 @@ class RaftReplDevTestBase : public testing::Test {
}

void run_on_leader(std::shared_ptr< TestReplicatedDB > db, auto&& lambda) {
if (!db || !db->repl_dev()) {
// Spare which are not added to group will not have repl dev.
return;
}

do {
auto leader_uuid = db->repl_dev()->get_leader_id();

Expand All @@ -527,6 +533,8 @@ class RaftReplDevTestBase : public testing::Test {
}

void write_on_leader(uint32_t num_entries, bool wait_for_commit = true, shared< TestReplicatedDB > db = nullptr) {
if (dbs_[0]->repl_dev() == nullptr) return;

do {
auto leader_uuid = dbs_[0]->repl_dev()->get_leader_id();

Expand Down Expand Up @@ -614,14 +622,20 @@ class RaftReplDevTestBase : public testing::Test {
void truncate(int num_reserved_entries) { dbs_[0]->truncate(num_reserved_entries); }

void replace_member(std::shared_ptr< TestReplicatedDB > db, replica_id_t member_out, replica_id_t member_in,
uint32_t commit_quorum = 0) {
this->run_on_leader(db, [this, db, member_out, member_in, commit_quorum]() {
uint32_t commit_quorum = 0, ReplServiceError error = ReplServiceError::OK) {
this->run_on_leader(db, [this, error, db, member_out, member_in, commit_quorum]() {
LOGINFO("Replace member out={} in={}", boost::uuids::to_string(member_out),
boost::uuids::to_string(member_in));

replica_member_info out{member_out, ""};
replica_member_info in{member_in, ""};
auto v = hs()->repl_service().replace_member(db->repl_dev()->group_id(), out, in, commit_quorum).get();
ASSERT_EQ(v.hasError(), false) << "Error in replacing member";
auto result = hs()->repl_service().replace_member(db->repl_dev()->group_id(), out, in, commit_quorum).get();
if (error == ReplServiceError::OK) {
ASSERT_EQ(result.hasError(), false) << "Error in replacing member";
} else {
ASSERT_EQ(result.hasError(), true) << "Error in replacing member";
ASSERT_EQ(result.error(), error);
}
});
}

Expand Down
182 changes: 168 additions & 14 deletions src/tests/test_raft_repl_dev_dynamic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,17 @@
#include "test_common/raft_repl_test_base.hpp"

// Dynamic tests spawn spare replica's also which can be used to add and remove from a repl dev.
class ReplDevDynamicTest : public RaftReplDevTestBase {};
class ReplDevDynamicTest : public RaftReplDevTestBase {
private:
bool is_replica_num_in(const std::set< uint32_t >& replicas) {
// Check if the current replica process is in this set.
return replicas.count(g_helper->replica_num()) != 0 ? true : false;
}
};

TEST_F(ReplDevDynamicTest, ReplaceMember) {
LOGINFO("ReplaceMember test started replica={}", g_helper->replica_num());
// Write some IO's, replace a member, validate all members data except which is out.
LOGINFO("Homestore replica={} setup completed", g_helper->replica_num());
auto db = dbs_.back();
auto num_replicas = SISL_OPTIONS["replicas"].as< uint32_t >();
auto num_members = SISL_OPTIONS["replicas"].as< uint32_t >() + SISL_OPTIONS["spare_replicas"].as< uint32_t >();
Expand All @@ -45,28 +51,28 @@ TEST_F(ReplDevDynamicTest, ReplaceMember) {

g_helper->sync_for_verify_start(num_members);
LOGINFO("sync_for_verify_state replica={} ", g_helper->replica_num());
if (g_helper->replica_num() != member_out) {
if (is_replica_num_in({0, 1, member_in})) {
// Skip the member which is going to be replaced. Validate data on all other replica's.
LOGINFO("Validate all data written so far by reading them replica={}", g_helper->replica_num());
this->validate_data();
} else {
} else if (g_helper->replica_num() == member_out) {
// The out member will have the repl dev destroyed.
auto repl_dev = std::dynamic_pointer_cast< RaftReplDev >(db->repl_dev());
do {
while (repl_dev && !repl_dev->is_destroyed()) {
std::this_thread::sleep_for(std::chrono::seconds(1));
auto& raft_repl_svc = dynamic_cast< RaftReplService& >(hs()->repl_service());
raft_repl_svc.gc_repl_devs();
LOGINFO("Waiting for repl dev to get destroyed on out member replica={}", g_helper->replica_num());
} while (!repl_dev->is_destroyed());
}
LOGINFO("Repl dev destroyed on out member replica={}", g_helper->replica_num());
}

g_helper->sync_for_cleanup_start(num_members);
LOGINFO("ReplaceMember test done");
LOGINFO("ReplaceMember test done replica={}", g_helper->replica_num());
}

TEST_F(ReplDevDynamicTest, TwoMemberDown) {
LOGINFO("TwoMemberDown test started");
LOGINFO("TwoMemberDown test started replica={}", g_helper->replica_num());

// Make two members down in a group and leader cant reach a quorum.
// We set the custom quorum size to 1 and call replace member.
Expand Down Expand Up @@ -110,28 +116,176 @@ TEST_F(ReplDevDynamicTest, TwoMemberDown) {
LOGINFO("Member in got all commits");
}

if (g_helper->replica_num() == 0 || g_helper->replica_num() == member_in) {
if (is_replica_num_in({0, member_in})) {
// Validate data on leader replica 0 and replica 3
LOGINFO("Validate all data written so far by reading them replica={}", g_helper->replica_num());
this->validate_data();
}

g_helper->sync_for_cleanup_start(num_members);

if (g_helper->replica_num() == 1) {
LOGINFO("Start replica 1");
db->set_zombie();
this->start_replica(1);
}
if (g_helper->replica_num() == 2) {
LOGINFO("Start replica 2");
db->set_zombie();
this->start_replica(2);
}

g_helper->sync_for_cleanup_start(num_members);
LOGINFO("TwoMemberDown test done replica={}", g_helper->replica_num());
}

TEST_F(ReplDevDynamicTest, OneMemberDown) {
// replica0(leader) and replica1 up, replica2 is down. Replace replica2 with replica3.
// replica0 should be able to baseline resync to replica4(new member).
// Write some IO's, replace a member, validate all members data except which is out.
LOGINFO("OneMemberDown test started replica={}", g_helper->replica_num());
auto db = dbs_.back();
auto num_replicas = SISL_OPTIONS["replicas"].as< uint32_t >();
auto num_members = SISL_OPTIONS["replicas"].as< uint32_t >() + SISL_OPTIONS["spare_replicas"].as< uint32_t >();
uint64_t num_io_entries = SISL_OPTIONS["num_io"].as< uint64_t >();

// Replace the last member in the group with index(num_replicas - 1) with a spare
// replica with index (num_replica). Member id's are 0,...,num_replicas-1, num_replicas,...,N
uint32_t member_out = num_replicas - 1;
uint32_t member_in = num_replicas;

g_helper->sync_for_test_start(num_members);

this->shutdown_replica(2);
LOGINFO("Shutdown replica 2");

std::this_thread::sleep_for(std::chrono::seconds(3));
if (g_helper->replica_num() == 0) {
// With existing raft repl dev group, write IO's, validate and call replace_member on leader.
LOGINFO("Writing on leader num_io={} replica={}", num_io_entries, g_helper->replica_num());
this->write_on_leader(num_io_entries, true /* wait_for_commit */);

replace_member(db, g_helper->replica_id(member_out), g_helper->replica_id(member_in));
std::this_thread::sleep_for(std::chrono::seconds(3));
} else if (g_helper->replica_num() == member_in) {
LOGINFO("Wait for commits replica={}", g_helper->replica_num());
wait_for_commits(num_io_entries);
}

g_helper->sync_for_verify_start(num_members);
LOGINFO("sync_for_verify_state replica={} ", g_helper->replica_num());
if (is_replica_num_in({0, 1, member_in})) {
// Skip the member which is going to be replaced. Validate data on all other replica's.
LOGINFO("Validate all data written so far by reading them replica={}", g_helper->replica_num());
this->validate_data();
}

g_helper->sync_for_cleanup_start(num_members);

if (g_helper->replica_num() == 2) {
LOGINFO("Start replica 2");
db->set_zombie();
this->start_replica(2);
}

LOGINFO("TwoMemberDown test done");
LOGINFO("OneMemberDown test done replica={}", g_helper->replica_num());
}

// TODO add more tests with leader and member restart, multiple member replace
// leader replace
TEST_F(ReplDevDynamicTest, LeaderReplace) {
// replica0(leader) and replica1 and replica2 is up. Replace replica0(leader) with replica3.
// replica0 will yield leadership and any other replica will be come leader and leader
// will do baseline resync to replica4(new member).
// Write some IO's, replace a member, validate all members data except which is out.
LOGINFO("LeaderReplace test started replica={}", g_helper->replica_num());
auto db = dbs_.back();
auto num_replicas = SISL_OPTIONS["replicas"].as< uint32_t >();
auto num_members = SISL_OPTIONS["replicas"].as< uint32_t >() + SISL_OPTIONS["spare_replicas"].as< uint32_t >();
uint64_t num_io_entries = SISL_OPTIONS["num_io"].as< uint64_t >();

// Replace the leader in the group with index(0) with a spare
// replica with index (num_replica). Member id's are 0,...,num_replicas-1, num_replicas,...,N
uint32_t member_out = 0;
uint32_t member_in = num_replicas;

g_helper->sync_for_test_start(num_members);

if (g_helper->replica_num() != member_in) {
LOGINFO("Writing on leader num_io={} replica={}", num_io_entries, g_helper->replica_num());
// With existing raft repl dev group, write IO's, validate and call replace_member on leader.
this->write_on_leader(num_io_entries, true /* wait_for_commit */);

// Leader will return error NOT_LEADER and yield leadership, sleep and connect again
// to the new leader.
LOGINFO("Replace old leader");
replace_member(db, g_helper->replica_id(member_out), g_helper->replica_id(member_in), 0,
ReplServiceError::NOT_LEADER);
LOGINFO("Replace member leader yield done");

std::this_thread::sleep_for(std::chrono::seconds(3));
replace_member(db, g_helper->replica_id(member_out), g_helper->replica_id(member_in));
LOGINFO("Replace member old leader done");
}

if (g_helper->replica_num() == member_in) {
LOGINFO("Wait for commits replica={}", g_helper->replica_num());
wait_for_commits(num_io_entries);
}

g_helper->sync_for_verify_start(num_members);
if (is_replica_num_in({0, 1, member_in})) {
// Skip the member which is going to be replaced. Validate data on all other replica's.
LOGINFO("Validate all data written so far by reading them replica={}", g_helper->replica_num());
this->validate_data();
}

if (g_helper->replica_num() == member_out) { db->set_zombie(); }

g_helper->sync_for_cleanup_start(num_members);
LOGINFO("LeaderReplace test done replica={}", g_helper->replica_num());
}

TEST_F(ReplDevDynamicTest, OneMemberRestart) {
// replica0(leader) is up and replica1 is restated, replica2 is down. Replace replica2 with replica3.
// replica0 should be able to baseline resync to replica4(new member).
// Write some IO's, replace a member, validate all members data except which is out.
LOGINFO("OneMemberRestart test started replica={}", g_helper->replica_num());
auto db = dbs_.back();
auto num_replicas = SISL_OPTIONS["replicas"].as< uint32_t >();
auto num_members = SISL_OPTIONS["replicas"].as< uint32_t >() + SISL_OPTIONS["spare_replicas"].as< uint32_t >();
uint64_t num_io_entries = SISL_OPTIONS["num_io"].as< uint64_t >();

// Replace the last member in the group with index(num_replicas - 1) with a spare
// replica with index (num_replica). Member id's are 0,...,num_replicas-1, num_replicas,...,N
uint32_t member_out = num_replicas - 1;
uint32_t member_in = num_replicas;

g_helper->sync_for_test_start(num_members);
if (g_helper->replica_num() == 1) {
LOGINFO("Restart replica 1");
this->restart_replica(15);
}

if (g_helper->replica_num() == 0) {
// With existing raft repl dev group, write IO's, validate and call replace_member on leader.
LOGINFO("Writing on leader num_io={} replica={}", num_io_entries, g_helper->replica_num());
this->write_on_leader(num_io_entries, true /* wait_for_commit */);

replace_member(db, g_helper->replica_id(member_out), g_helper->replica_id(member_in));
std::this_thread::sleep_for(std::chrono::seconds(3));
} else if (g_helper->replica_num() == member_in) {
LOGINFO("Wait for commits replica={}", g_helper->replica_num());
wait_for_commits(num_io_entries);
}

g_helper->sync_for_verify_start(num_members);
LOGINFO("sync_for_verify_state replica={} ", g_helper->replica_num());
if (is_replica_num_in({0, 1, member_in})) {
// Skip the member which is going to be replaced. Validate data on all other replica's.
LOGINFO("Validate all data written so far by reading them replica={}", g_helper->replica_num());
this->validate_data();
}

g_helper->sync_for_cleanup_start(num_members);
LOGINFO("OneMemberRestart test done replica={}", g_helper->replica_num());
}

int main(int argc, char* argv[]) {
int parsed_argc = argc;
Expand Down

0 comments on commit c4efe11

Please sign in to comment.