diff --git a/conanfile.py b/conanfile.py index f61f1ef54..0cb18bd1d 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "6.5.4" + version = "6.5.5" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/lib/replication/repl_dev/raft_state_machine.cpp b/src/lib/replication/repl_dev/raft_state_machine.cpp index 6e920b997..d1b210526 100644 --- a/src/lib/replication/repl_dev/raft_state_machine.cpp +++ b/src/lib/replication/repl_dev/raft_state_machine.cpp @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -9,6 +10,7 @@ #include "repl_dev/raft_repl_dev.h" #include #include "common/homestore_config.hpp" +#include "common/crash_simulator.hpp" SISL_LOGGING_DECL(replication) @@ -293,6 +295,14 @@ void RaftStateMachine::save_logical_snp_obj(nuraft::snapshot& s, ulong& obj_id, // Update the object offset. obj_id = snp_data->offset; + +#ifdef _PRERELEASE + if (iomgr_flip::instance()->test_flip("baseline_resync_restart_new_follower")) { + LOGINFO("Hit flip baseline_resync_restart_new_follower crashing"); + hs()->crash_simulator().crash(); + return; + } +#endif } bool RaftStateMachine::apply_snapshot(nuraft::snapshot& s) { diff --git a/src/tests/test_common/hs_repl_test_common.hpp b/src/tests/test_common/hs_repl_test_common.hpp index 672acffcb..c9ff71567 100644 --- a/src/tests/test_common/hs_repl_test_common.hpp +++ b/src/tests/test_common/hs_repl_test_common.hpp @@ -252,6 +252,10 @@ class HSReplTestHelper : public HSTestHelper { start_homestore(); } + void reinit_repl_app() { + m_token.params(HS_SERVICE::REPLICATION).repl_app = std::make_unique< TestReplApplication >(*this); + } + uint16_t replica_num() const { return replica_num_; } homestore::replica_id_t my_replica_id() const { return my_replica_id_; } homestore::replica_id_t replica_id(uint16_t member_id) const { @@ -317,6 +321,11 @@ class HSReplTestHelper : public HSTestHelper { } } + void add_listener(std::shared_ptr< ReplDevListener > listener) { + std::unique_lock lg(groups_mtx_); + pending_listeners_.emplace_back(listener); + } + size_t num_listeners() const { std::unique_lock lg(groups_mtx_); return repl_groups_.size(); diff --git a/src/tests/test_common/raft_repl_test_base.hpp b/src/tests/test_common/raft_repl_test_base.hpp index e0e2f6487..1ab90143a 100644 --- a/src/tests/test_common/raft_repl_test_base.hpp +++ b/src/tests/test_common/raft_repl_test_base.hpp @@ -204,7 +204,7 @@ class TestReplicatedDB : public homestore::ReplDevListener { kv_snapshot_data.emplace_back(Key{v.id_}, v); LOGTRACEMOD(replication, "[Replica={}] Read logical snapshot callback fetching lsn={} size={} pattern={}", g_helper->replica_num(), v.lsn_, v.data_size_, v.data_pattern_); - if (kv_snapshot_data.size() >= 1000) { break; } + if (kv_snapshot_data.size() >= 10) { break; } } if (kv_snapshot_data.size() == 0) { @@ -430,6 +430,7 @@ class RaftReplDevTestBase : public testing::Test { for (auto const& db : dbs_) { if (db->is_zombie()) { continue; } auto repl_dev = std::dynamic_pointer_cast< RaftReplDev >(db->repl_dev()); + if (!repl_dev) continue; int i = 0; bool force_leave = false; do { @@ -511,6 +512,11 @@ class RaftReplDevTestBase : public testing::Test { } void run_on_leader(std::shared_ptr< TestReplicatedDB > db, auto&& lambda) { + if (!db || !db->repl_dev()) { + // Spare which are not added to group will not have repl dev. + return; + } + do { auto leader_uuid = db->repl_dev()->get_leader_id(); @@ -527,6 +533,8 @@ class RaftReplDevTestBase : public testing::Test { } void write_on_leader(uint32_t num_entries, bool wait_for_commit = true, shared< TestReplicatedDB > db = nullptr) { + if (dbs_[0]->repl_dev() == nullptr) return; + do { auto leader_uuid = dbs_[0]->repl_dev()->get_leader_id(); @@ -614,14 +622,20 @@ class RaftReplDevTestBase : public testing::Test { void truncate(int num_reserved_entries) { dbs_[0]->truncate(num_reserved_entries); } void replace_member(std::shared_ptr< TestReplicatedDB > db, replica_id_t member_out, replica_id_t member_in, - uint32_t commit_quorum = 0) { - this->run_on_leader(db, [this, db, member_out, member_in, commit_quorum]() { + uint32_t commit_quorum = 0, ReplServiceError error = ReplServiceError::OK) { + this->run_on_leader(db, [this, error, db, member_out, member_in, commit_quorum]() { LOGINFO("Replace member out={} in={}", boost::uuids::to_string(member_out), boost::uuids::to_string(member_in)); + replica_member_info out{member_out, ""}; replica_member_info in{member_in, ""}; - auto v = hs()->repl_service().replace_member(db->repl_dev()->group_id(), out, in, commit_quorum).get(); - ASSERT_EQ(v.hasError(), false) << "Error in replacing member"; + auto result = hs()->repl_service().replace_member(db->repl_dev()->group_id(), out, in, commit_quorum).get(); + if (error == ReplServiceError::OK) { + ASSERT_EQ(result.hasError(), false) << "Error in replacing member"; + } else { + ASSERT_EQ(result.hasError(), true) << "Error in replacing member"; + ASSERT_EQ(result.error(), error); + } }); } diff --git a/src/tests/test_raft_repl_dev_dynamic.cpp b/src/tests/test_raft_repl_dev_dynamic.cpp index c29f239e1..5a6095959 100644 --- a/src/tests/test_raft_repl_dev_dynamic.cpp +++ b/src/tests/test_raft_repl_dev_dynamic.cpp @@ -15,11 +15,17 @@ #include "test_common/raft_repl_test_base.hpp" // Dynamic tests spawn spare replica's also which can be used to add and remove from a repl dev. -class ReplDevDynamicTest : public RaftReplDevTestBase {}; +class ReplDevDynamicTest : public RaftReplDevTestBase { +private: + bool is_replica_num_in(const std::set< uint32_t >& replicas) { + // Check if the current replica process is in this set. + return replicas.count(g_helper->replica_num()) != 0 ? true : false; + } +}; TEST_F(ReplDevDynamicTest, ReplaceMember) { + LOGINFO("ReplaceMember test started replica={}", g_helper->replica_num()); // Write some IO's, replace a member, validate all members data except which is out. - LOGINFO("Homestore replica={} setup completed", g_helper->replica_num()); auto db = dbs_.back(); auto num_replicas = SISL_OPTIONS["replicas"].as< uint32_t >(); auto num_members = SISL_OPTIONS["replicas"].as< uint32_t >() + SISL_OPTIONS["spare_replicas"].as< uint32_t >(); @@ -45,28 +51,28 @@ TEST_F(ReplDevDynamicTest, ReplaceMember) { g_helper->sync_for_verify_start(num_members); LOGINFO("sync_for_verify_state replica={} ", g_helper->replica_num()); - if (g_helper->replica_num() != member_out) { + if (is_replica_num_in({0, 1, member_in})) { // Skip the member which is going to be replaced. Validate data on all other replica's. LOGINFO("Validate all data written so far by reading them replica={}", g_helper->replica_num()); this->validate_data(); - } else { + } else if (g_helper->replica_num() == member_out) { // The out member will have the repl dev destroyed. auto repl_dev = std::dynamic_pointer_cast< RaftReplDev >(db->repl_dev()); - do { + while (repl_dev && !repl_dev->is_destroyed()) { std::this_thread::sleep_for(std::chrono::seconds(1)); auto& raft_repl_svc = dynamic_cast< RaftReplService& >(hs()->repl_service()); raft_repl_svc.gc_repl_devs(); LOGINFO("Waiting for repl dev to get destroyed on out member replica={}", g_helper->replica_num()); - } while (!repl_dev->is_destroyed()); + } LOGINFO("Repl dev destroyed on out member replica={}", g_helper->replica_num()); } g_helper->sync_for_cleanup_start(num_members); - LOGINFO("ReplaceMember test done"); + LOGINFO("ReplaceMember test done replica={}", g_helper->replica_num()); } TEST_F(ReplDevDynamicTest, TwoMemberDown) { - LOGINFO("TwoMemberDown test started"); + LOGINFO("TwoMemberDown test started replica={}", g_helper->replica_num()); // Make two members down in a group and leader cant reach a quorum. // We set the custom quorum size to 1 and call replace member. @@ -110,28 +116,176 @@ TEST_F(ReplDevDynamicTest, TwoMemberDown) { LOGINFO("Member in got all commits"); } - if (g_helper->replica_num() == 0 || g_helper->replica_num() == member_in) { + if (is_replica_num_in({0, member_in})) { // Validate data on leader replica 0 and replica 3 LOGINFO("Validate all data written so far by reading them replica={}", g_helper->replica_num()); this->validate_data(); } - g_helper->sync_for_cleanup_start(num_members); - if (g_helper->replica_num() == 1) { LOGINFO("Start replica 1"); + db->set_zombie(); this->start_replica(1); } if (g_helper->replica_num() == 2) { LOGINFO("Start replica 2"); + db->set_zombie(); + this->start_replica(2); + } + + g_helper->sync_for_cleanup_start(num_members); + LOGINFO("TwoMemberDown test done replica={}", g_helper->replica_num()); +} + +TEST_F(ReplDevDynamicTest, OneMemberDown) { + // replica0(leader) and replica1 up, replica2 is down. Replace replica2 with replica3. + // replica0 should be able to baseline resync to replica4(new member). + // Write some IO's, replace a member, validate all members data except which is out. + LOGINFO("OneMemberDown test started replica={}", g_helper->replica_num()); + auto db = dbs_.back(); + auto num_replicas = SISL_OPTIONS["replicas"].as< uint32_t >(); + auto num_members = SISL_OPTIONS["replicas"].as< uint32_t >() + SISL_OPTIONS["spare_replicas"].as< uint32_t >(); + uint64_t num_io_entries = SISL_OPTIONS["num_io"].as< uint64_t >(); + + // Replace the last member in the group with index(num_replicas - 1) with a spare + // replica with index (num_replica). Member id's are 0,...,num_replicas-1, num_replicas,...,N + uint32_t member_out = num_replicas - 1; + uint32_t member_in = num_replicas; + + g_helper->sync_for_test_start(num_members); + + this->shutdown_replica(2); + LOGINFO("Shutdown replica 2"); + + std::this_thread::sleep_for(std::chrono::seconds(3)); + if (g_helper->replica_num() == 0) { + // With existing raft repl dev group, write IO's, validate and call replace_member on leader. + LOGINFO("Writing on leader num_io={} replica={}", num_io_entries, g_helper->replica_num()); + this->write_on_leader(num_io_entries, true /* wait_for_commit */); + + replace_member(db, g_helper->replica_id(member_out), g_helper->replica_id(member_in)); + std::this_thread::sleep_for(std::chrono::seconds(3)); + } else if (g_helper->replica_num() == member_in) { + LOGINFO("Wait for commits replica={}", g_helper->replica_num()); + wait_for_commits(num_io_entries); + } + + g_helper->sync_for_verify_start(num_members); + LOGINFO("sync_for_verify_state replica={} ", g_helper->replica_num()); + if (is_replica_num_in({0, 1, member_in})) { + // Skip the member which is going to be replaced. Validate data on all other replica's. + LOGINFO("Validate all data written so far by reading them replica={}", g_helper->replica_num()); + this->validate_data(); + } + + g_helper->sync_for_cleanup_start(num_members); + + if (g_helper->replica_num() == 2) { + LOGINFO("Start replica 2"); + db->set_zombie(); this->start_replica(2); } - LOGINFO("TwoMemberDown test done"); + LOGINFO("OneMemberDown test done replica={}", g_helper->replica_num()); } -// TODO add more tests with leader and member restart, multiple member replace -// leader replace +TEST_F(ReplDevDynamicTest, LeaderReplace) { + // replica0(leader) and replica1 and replica2 is up. Replace replica0(leader) with replica3. + // replica0 will yield leadership and any other replica will be come leader and leader + // will do baseline resync to replica4(new member). + // Write some IO's, replace a member, validate all members data except which is out. + LOGINFO("LeaderReplace test started replica={}", g_helper->replica_num()); + auto db = dbs_.back(); + auto num_replicas = SISL_OPTIONS["replicas"].as< uint32_t >(); + auto num_members = SISL_OPTIONS["replicas"].as< uint32_t >() + SISL_OPTIONS["spare_replicas"].as< uint32_t >(); + uint64_t num_io_entries = SISL_OPTIONS["num_io"].as< uint64_t >(); + + // Replace the leader in the group with index(0) with a spare + // replica with index (num_replica). Member id's are 0,...,num_replicas-1, num_replicas,...,N + uint32_t member_out = 0; + uint32_t member_in = num_replicas; + + g_helper->sync_for_test_start(num_members); + + if (g_helper->replica_num() != member_in) { + LOGINFO("Writing on leader num_io={} replica={}", num_io_entries, g_helper->replica_num()); + // With existing raft repl dev group, write IO's, validate and call replace_member on leader. + this->write_on_leader(num_io_entries, true /* wait_for_commit */); + + // Leader will return error NOT_LEADER and yield leadership, sleep and connect again + // to the new leader. + LOGINFO("Replace old leader"); + replace_member(db, g_helper->replica_id(member_out), g_helper->replica_id(member_in), 0, + ReplServiceError::NOT_LEADER); + LOGINFO("Replace member leader yield done"); + + std::this_thread::sleep_for(std::chrono::seconds(3)); + replace_member(db, g_helper->replica_id(member_out), g_helper->replica_id(member_in)); + LOGINFO("Replace member old leader done"); + } + + if (g_helper->replica_num() == member_in) { + LOGINFO("Wait for commits replica={}", g_helper->replica_num()); + wait_for_commits(num_io_entries); + } + + g_helper->sync_for_verify_start(num_members); + if (is_replica_num_in({0, 1, member_in})) { + // Skip the member which is going to be replaced. Validate data on all other replica's. + LOGINFO("Validate all data written so far by reading them replica={}", g_helper->replica_num()); + this->validate_data(); + } + + if (g_helper->replica_num() == member_out) { db->set_zombie(); } + + g_helper->sync_for_cleanup_start(num_members); + LOGINFO("LeaderReplace test done replica={}", g_helper->replica_num()); +} + +TEST_F(ReplDevDynamicTest, OneMemberRestart) { + // replica0(leader) is up and replica1 is restated, replica2 is down. Replace replica2 with replica3. + // replica0 should be able to baseline resync to replica4(new member). + // Write some IO's, replace a member, validate all members data except which is out. + LOGINFO("OneMemberRestart test started replica={}", g_helper->replica_num()); + auto db = dbs_.back(); + auto num_replicas = SISL_OPTIONS["replicas"].as< uint32_t >(); + auto num_members = SISL_OPTIONS["replicas"].as< uint32_t >() + SISL_OPTIONS["spare_replicas"].as< uint32_t >(); + uint64_t num_io_entries = SISL_OPTIONS["num_io"].as< uint64_t >(); + + // Replace the last member in the group with index(num_replicas - 1) with a spare + // replica with index (num_replica). Member id's are 0,...,num_replicas-1, num_replicas,...,N + uint32_t member_out = num_replicas - 1; + uint32_t member_in = num_replicas; + + g_helper->sync_for_test_start(num_members); + if (g_helper->replica_num() == 1) { + LOGINFO("Restart replica 1"); + this->restart_replica(15); + } + + if (g_helper->replica_num() == 0) { + // With existing raft repl dev group, write IO's, validate and call replace_member on leader. + LOGINFO("Writing on leader num_io={} replica={}", num_io_entries, g_helper->replica_num()); + this->write_on_leader(num_io_entries, true /* wait_for_commit */); + + replace_member(db, g_helper->replica_id(member_out), g_helper->replica_id(member_in)); + std::this_thread::sleep_for(std::chrono::seconds(3)); + } else if (g_helper->replica_num() == member_in) { + LOGINFO("Wait for commits replica={}", g_helper->replica_num()); + wait_for_commits(num_io_entries); + } + + g_helper->sync_for_verify_start(num_members); + LOGINFO("sync_for_verify_state replica={} ", g_helper->replica_num()); + if (is_replica_num_in({0, 1, member_in})) { + // Skip the member which is going to be replaced. Validate data on all other replica's. + LOGINFO("Validate all data written so far by reading them replica={}", g_helper->replica_num()); + this->validate_data(); + } + + g_helper->sync_for_cleanup_start(num_members); + LOGINFO("OneMemberRestart test done replica={}", g_helper->replica_num()); +} int main(int argc, char* argv[]) { int parsed_argc = argc;