diff --git a/src/include/homestore/replication/repl_dev.h b/src/include/homestore/replication/repl_dev.h index 15dc4872a..8a31cd674 100644 --- a/src/include/homestore/replication/repl_dev.h +++ b/src/include/homestore/replication/repl_dev.h @@ -348,7 +348,7 @@ class ReplDevListener { virtual void on_destroy() = 0; /// @brief Called when replace member is performed. - virtual void replace_member(replica_id_t member_out, replica_id_t member_in) = 0; + virtual void replace_member(replica_id_t member_out, replica_id_t member_in, uint32_t commit_quorum = 0) = 0; /// @brief Called when the snapshot is being created by nuraft virtual AsyncReplResult<> create_snapshot(shared< snapshot_context > context) = 0; diff --git a/src/include/homestore/replication_service.hpp b/src/include/homestore/replication_service.hpp index 8f535b855..f9b4f2986 100644 --- a/src/include/homestore/replication_service.hpp +++ b/src/include/homestore/replication_service.hpp @@ -41,8 +41,8 @@ class ReplicationService { /// @return A Future which gets called after schedule to release (before garbage collection is kicked in) virtual folly::SemiFuture< ReplServiceError > remove_repl_dev(group_id_t group_id) = 0; - virtual AsyncReplResult<> replace_member(group_id_t group_id, replica_id_t member_out, - replica_id_t member_in) const = 0; + virtual AsyncReplResult<> replace_member(group_id_t group_id, replica_id_t member_out, replica_id_t member_in, + uint32_t commit_quorum = 0) const = 0; /// @brief Get the repl dev for a given group id if it is already created or opened /// @param group_id Group id interested in diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index e928f8996..39732a096 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -113,23 +113,30 @@ bool RaftReplDev::join_group() { return true; } -AsyncReplResult<> RaftReplDev::replace_member(replica_id_t member_out_uuid, replica_id_t member_in_uuid) { +AsyncReplResult<> RaftReplDev::replace_member(replica_id_t member_out_uuid, replica_id_t member_in_uuid, + uint32_t commit_quorum) { LOGINFO("Replace member group_id={} member_out={} member_in={}", group_id_str(), boost::uuids::to_string(member_out_uuid), boost::uuids::to_string(member_in_uuid)); + if (commit_quorum >= 1) { + // Two members are down and leader cant form the quorum. Reduce the quorum size. + reset_quorum_size(commit_quorum); + } + // Step 1: Check if leader itself is requested to move out. if (m_my_repl_id == member_out_uuid && m_my_repl_id == get_leader_id()) { // If leader is the member requested to move out, then give up leadership and return error. // Client will retry replace_member request to the new leader. raft_server()->yield_leadership(true /* immediate */, -1 /* successor */); RD_LOGI("Replace member leader is the member_out so yield leadership"); + reset_quorum_size(0); return make_async_error<>(ReplServiceError::NOT_LEADER); } // Step 2. Add the new member. return m_msg_mgr.add_member(m_group_id, member_in_uuid) .via(&folly::InlineExecutor::instance()) - .thenValue([this, member_in_uuid, member_out_uuid](auto&& e) -> AsyncReplResult<> { + .thenValue([this, member_in_uuid, member_out_uuid, commit_quorum](auto&& e) -> AsyncReplResult<> { // TODO Currently we ignore the cancelled, fix nuraft_mesg to not timeout // when adding member. Member is added to cluster config until member syncs fully // with atleast stop gap. This will take a lot of time for block or @@ -143,6 +150,7 @@ AsyncReplResult<> RaftReplDev::replace_member(replica_id_t member_out_uuid, repl RD_LOGW("Ignoring error returned from nuraft add_member {}", e.error()); } else { RD_LOGE("Replace member error in add member : {}", e.error()); + reset_quorum_size(0); return make_async_error<>(RaftReplService::to_repl_error(e.error())); } } @@ -165,6 +173,7 @@ AsyncReplResult<> RaftReplDev::replace_member(replica_id_t member_out_uuid, repl auto err = m_state_machine->propose_to_raft(std::move(rreq)); if (err != ReplServiceError::OK) { LOGERROR("Replace member propose to raft failed {}", err); + reset_quorum_size(0); return make_async_error<>(std::move(err)); } @@ -175,7 +184,7 @@ AsyncReplResult<> RaftReplDev::replace_member(replica_id_t member_out_uuid, repl // entry and call exit_group() and leave(). return m_msg_mgr.rem_member(m_group_id, member_out_uuid) .via(&folly::InlineExecutor::instance()) - .thenValue([this, member_out](auto&& e) -> AsyncReplResult<> { + .thenValue([this, member_out, commit_quorum](auto&& e) -> AsyncReplResult<> { if (e.hasError()) { // Ignore the server not found as server removed from the cluster // as requests are idempotent and can be resend. @@ -185,16 +194,28 @@ AsyncReplResult<> RaftReplDev::replace_member(replica_id_t member_out_uuid, repl // Its ok to retry this request as the request // of replace member is idempotent. RD_LOGE("Replace member failed to remove member : {}", e.error()); + reset_quorum_size(0); return make_async_error<>(ReplServiceError::RETRY_REQUEST); } } else { RD_LOGI("Replace member removed member={} from group_id={}", member_out, group_id_str()); } + + // Revert the quorum size back to 0. + reset_quorum_size(0); return make_async_success<>(); }); }); } +void RaftReplDev::reset_quorum_size(uint32_t commit_quorum) { + RD_LOGI("Reset raft quorum size={}", commit_quorum); + nuraft::raft_params params = raft_server()->get_current_params(); + params.with_custom_commit_quorum_size(commit_quorum); + params.with_custom_election_quorum_size(commit_quorum); + raft_server()->update_params(params); +} + folly::SemiFuture< ReplServiceError > RaftReplDev::destroy_group() { // Set the intent to destroy the group m_stage.update([](auto* stage) { *stage = repl_dev_stage_t::DESTROYING; }); diff --git a/src/lib/replication/repl_dev/raft_repl_dev.h b/src/lib/replication/repl_dev/raft_repl_dev.h index 82fdcaa23..621d45b36 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.h +++ b/src/lib/replication/repl_dev/raft_repl_dev.h @@ -155,7 +155,7 @@ class RaftReplDev : public ReplDev, virtual ~RaftReplDev() = default; bool join_group(); - AsyncReplResult<> replace_member(replica_id_t member_out, replica_id_t member_in); + AsyncReplResult<> replace_member(replica_id_t member_out, replica_id_t member_in, uint32_t commit_quorum); folly::SemiFuture< ReplServiceError > destroy_group(); //////////////// All ReplDev overrides/implementation /////////////////////// @@ -275,6 +275,7 @@ class RaftReplDev : public ReplDev, void on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx); void commit_blk(repl_req_ptr_t rreq); void replace_member(repl_req_ptr_t rreq); + void reset_quorum_size(uint32_t commit_quorum); }; } // namespace homestore diff --git a/src/lib/replication/service/generic_repl_svc.cpp b/src/lib/replication/service/generic_repl_svc.cpp index 89800df3f..8e5c9a7a1 100644 --- a/src/lib/replication/service/generic_repl_svc.cpp +++ b/src/lib/replication/service/generic_repl_svc.cpp @@ -147,8 +147,8 @@ void SoloReplService::load_repl_dev(sisl::byte_view const& buf, void* meta_cooki } } -AsyncReplResult<> SoloReplService::replace_member(group_id_t group_id, replica_id_t member_out, - replica_id_t member_in) const { +AsyncReplResult<> SoloReplService::replace_member(group_id_t group_id, replica_id_t member_out, replica_id_t member_in, + uint32_t commit_quorum) const { return make_async_error<>(ReplServiceError::NOT_IMPLEMENTED); } diff --git a/src/lib/replication/service/generic_repl_svc.h b/src/lib/replication/service/generic_repl_svc.h index e2d445427..5e0cb84a3 100644 --- a/src/lib/replication/service/generic_repl_svc.h +++ b/src/lib/replication/service/generic_repl_svc.h @@ -73,8 +73,8 @@ class SoloReplService : public GenericReplService { std::set< replica_id_t > const& members) override; folly::SemiFuture< ReplServiceError > remove_repl_dev(group_id_t group_id) override; void load_repl_dev(sisl::byte_view const& buf, void* meta_cookie) override; - AsyncReplResult<> replace_member(group_id_t group_id, replica_id_t member_out, - replica_id_t member_in) const override; + AsyncReplResult<> replace_member(group_id_t group_id, replica_id_t member_out, replica_id_t member_in, + uint32_t commit_quorum = 0) const override; }; class SoloReplServiceCPHandler : public CPCallbacks { diff --git a/src/lib/replication/service/raft_repl_service.cpp b/src/lib/replication/service/raft_repl_service.cpp index bbf921685..c61e5c132 100644 --- a/src/lib/replication/service/raft_repl_service.cpp +++ b/src/lib/replication/service/raft_repl_service.cpp @@ -330,13 +330,13 @@ void RaftReplService::load_repl_dev(sisl::byte_view const& buf, void* meta_cooki add_repl_dev(group_id, rdev); } -AsyncReplResult<> RaftReplService::replace_member(group_id_t group_id, replica_id_t member_out, - replica_id_t member_in) const { +AsyncReplResult<> RaftReplService::replace_member(group_id_t group_id, replica_id_t member_out, replica_id_t member_in, + uint32_t commit_quorum) const { auto rdev_result = get_repl_dev(group_id); if (!rdev_result) { return make_async_error<>(ReplServiceError::SERVER_NOT_FOUND); } return std::dynamic_pointer_cast< RaftReplDev >(rdev_result.value()) - ->replace_member(member_out, member_in) + ->replace_member(member_out, member_in, commit_quorum) .via(&folly::InlineExecutor::instance()) .thenValue([this](auto&& e) mutable { if (e.hasError()) { return make_async_error<>(e.error()); } diff --git a/src/lib/replication/service/raft_repl_service.h b/src/lib/replication/service/raft_repl_service.h index cba90e2e0..160e4851d 100644 --- a/src/lib/replication/service/raft_repl_service.h +++ b/src/lib/replication/service/raft_repl_service.h @@ -69,8 +69,8 @@ class RaftReplService : public GenericReplService, std::set< replica_id_t > const& members) override; folly::SemiFuture< ReplServiceError > remove_repl_dev(group_id_t group_id) override; void load_repl_dev(sisl::byte_view const& buf, void* meta_cookie) override; - AsyncReplResult<> replace_member(group_id_t group_id, replica_id_t member_out, - replica_id_t member_in) const override; + AsyncReplResult<> replace_member(group_id_t group_id, replica_id_t member_out, replica_id_t member_in, + uint32_t commit_quorum = 0) const override; private: RaftReplDev* raft_group_config_found(sisl::byte_view const& buf, void* meta_cookie); diff --git a/src/tests/test_common/raft_repl_test_base.hpp b/src/tests/test_common/raft_repl_test_base.hpp index 7b96afa4c..b9eb1b6c7 100644 --- a/src/tests/test_common/raft_repl_test_base.hpp +++ b/src/tests/test_common/raft_repl_test_base.hpp @@ -301,7 +301,7 @@ class TestReplicatedDB : public homestore::ReplDevListener { ReplResult< blk_alloc_hints > get_blk_alloc_hints(sisl::blob const& header, uint32_t data_size) override { return blk_alloc_hints{}; } - void replace_member(replica_id_t member_out, replica_id_t member_in) override {} + void replace_member(replica_id_t member_out, replica_id_t member_in, uint32_t commit_quorum) override {} void on_destroy() override { LOGINFOMOD(replication, "[Replica={}] Group={} is being destroyed", g_helper->replica_num(), @@ -610,11 +610,14 @@ class RaftReplDevTestBase : public testing::Test { void create_snapshot() { dbs_[0]->create_snapshot(); } void truncate(int num_reserved_entries) { dbs_[0]->truncate(num_reserved_entries); } - void replace_member(std::shared_ptr< TestReplicatedDB > db, replica_id_t member_out, replica_id_t member_in) { - this->run_on_leader(db, [this, db, member_out, member_in]() { + void replace_member(std::shared_ptr< TestReplicatedDB > db, replica_id_t member_out, replica_id_t member_in, + uint32_t commit_quorum = 0) { + this->run_on_leader(db, [this, db, member_out, member_in, commit_quorum]() { LOGINFO("Replace member out={} in={}", boost::uuids::to_string(member_out), boost::uuids::to_string(member_in)); - auto v = hs()->repl_service().replace_member(db->repl_dev()->group_id(), member_out, member_in).get(); + auto v = hs()->repl_service() + .replace_member(db->repl_dev()->group_id(), member_out, member_in, commit_quorum) + .get(); ASSERT_EQ(v.hasError(), false) << "Error in replacing member"; }); } diff --git a/src/tests/test_raft_repl_dev_dynamic.cpp b/src/tests/test_raft_repl_dev_dynamic.cpp index 7bd69a13c..c29f239e1 100644 --- a/src/tests/test_raft_repl_dev_dynamic.cpp +++ b/src/tests/test_raft_repl_dev_dynamic.cpp @@ -65,8 +65,73 @@ TEST_F(ReplDevDynamicTest, ReplaceMember) { LOGINFO("ReplaceMember test done"); } +TEST_F(ReplDevDynamicTest, TwoMemberDown) { + LOGINFO("TwoMemberDown test started"); + + // Make two members down in a group and leader cant reach a quorum. + // We set the custom quorum size to 1 and call replace member. + // Leader should do some writes to validate it has reach quorum size. + LOGINFO("Homestore replica={} setup completed", g_helper->replica_num()); + auto db = dbs_.back(); + auto num_replicas = SISL_OPTIONS["replicas"].as< uint32_t >(); + auto num_members = SISL_OPTIONS["replicas"].as< uint32_t >() + SISL_OPTIONS["spare_replicas"].as< uint32_t >(); + + uint64_t num_io_entries = SISL_OPTIONS["num_io"].as< uint64_t >(); + + // Replace the last member in the group with index(num_replicas - 1) with a spare + // replica with index (num_replica). Member id's are 0,...,num_replicas-1, num_replicas,...,N + uint32_t member_out = num_replicas - 1; + uint32_t member_in = num_replicas; + + g_helper->sync_for_test_start(num_members); + + // Shutdown replica 1 and replica 2 to simulate two member down. + if (g_helper->replica_num() == 1) { + this->shutdown_replica(1); + LOGINFO("Shutdown replica 1"); + } + + if (g_helper->replica_num() == 2) { + this->shutdown_replica(2); + LOGINFO("Shutdown replica 2"); + } + + if (g_helper->replica_num() == 0) { + // Replace down replica 2 with spare replica 3 with commit quorum 1 + // so that leader can go ahead with replacing member. + LOGINFO("Replace member started"); + replace_member(db, g_helper->replica_id(member_out), g_helper->replica_id(member_in), 1 /* commit quorum*/); + this->write_on_leader(num_io_entries, true /* wait_for_commit */); + LOGINFO("Leader completed num_io={}", num_io_entries); + } + + if (g_helper->replica_num() == member_in) { + wait_for_commits(num_io_entries); + LOGINFO("Member in got all commits"); + } + + if (g_helper->replica_num() == 0 || g_helper->replica_num() == member_in) { + // Validate data on leader replica 0 and replica 3 + LOGINFO("Validate all data written so far by reading them replica={}", g_helper->replica_num()); + this->validate_data(); + } + + g_helper->sync_for_cleanup_start(num_members); + + if (g_helper->replica_num() == 1) { + LOGINFO("Start replica 1"); + this->start_replica(1); + } + if (g_helper->replica_num() == 2) { + LOGINFO("Start replica 2"); + this->start_replica(2); + } + + LOGINFO("TwoMemberDown test done"); +} + // TODO add more tests with leader and member restart, multiple member replace -// leader replace, commit quorum +// leader replace int main(int argc, char* argv[]) { int parsed_argc = argc; @@ -89,7 +154,6 @@ int main(int argc, char* argv[]) { // leadership_expiry time. // HS_SETTINGS_FACTORY().modifiable_settings([](auto& s) { - s.consensus.leadership_expiry_ms = -1; // -1 means never expires; s.generic.repl_dev_cleanup_interval_sec = 1; // Disable implicit flush and timer. diff --git a/src/tests/test_solo_repl_dev.cpp b/src/tests/test_solo_repl_dev.cpp index c26ba273d..ce55ec23b 100644 --- a/src/tests/test_solo_repl_dev.cpp +++ b/src/tests/test_solo_repl_dev.cpp @@ -136,7 +136,7 @@ class SoloReplDevTest : public testing::Test { cintrusive< repl_req_ctx >& ctx) override { LOGINFO("Received error={} on repl_dev", enum_name(error)); } - void replace_member(replica_id_t member_out, replica_id_t member_in) override {} + void replace_member(replica_id_t member_out, replica_id_t member_in, uint32_t commit_quorum) override {} void on_destroy() override {} };