Skip to content

Commit

Permalink
Add raft commit quorum for replace member if two members down.
Browse files Browse the repository at this point in the history
  • Loading branch information
sanebay committed Sep 30, 2024
1 parent e76afe5 commit 4b09a0b
Show file tree
Hide file tree
Showing 11 changed files with 112 additions and 23 deletions.
2 changes: 1 addition & 1 deletion src/include/homestore/replication/repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ class ReplDevListener {
virtual void on_destroy() = 0;

/// @brief Called when replace member is performed.
virtual void replace_member(replica_id_t member_out, replica_id_t member_in) = 0;
virtual void replace_member(replica_id_t member_out, replica_id_t member_in, uint32_t commit_quorum = 0) = 0;

/// @brief Called when the snapshot is being created by nuraft
virtual AsyncReplResult<> create_snapshot(shared< snapshot_context > context) = 0;
Expand Down
4 changes: 2 additions & 2 deletions src/include/homestore/replication_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ class ReplicationService {
/// @return A Future which gets called after schedule to release (before garbage collection is kicked in)
virtual folly::SemiFuture< ReplServiceError > remove_repl_dev(group_id_t group_id) = 0;

virtual AsyncReplResult<> replace_member(group_id_t group_id, replica_id_t member_out,
replica_id_t member_in) const = 0;
virtual AsyncReplResult<> replace_member(group_id_t group_id, replica_id_t member_out, replica_id_t member_in,
uint32_t commit_quorum = 0) const = 0;

/// @brief Get the repl dev for a given group id if it is already created or opened
/// @param group_id Group id interested in
Expand Down
27 changes: 24 additions & 3 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,23 +113,30 @@ bool RaftReplDev::join_group() {
return true;
}

AsyncReplResult<> RaftReplDev::replace_member(replica_id_t member_out_uuid, replica_id_t member_in_uuid) {
AsyncReplResult<> RaftReplDev::replace_member(replica_id_t member_out_uuid, replica_id_t member_in_uuid,
uint32_t commit_quorum) {
LOGINFO("Replace member group_id={} member_out={} member_in={}", group_id_str(),
boost::uuids::to_string(member_out_uuid), boost::uuids::to_string(member_in_uuid));

if (commit_quorum >= 1) {
// Two members are down and leader cant form the quorum. Reduce the quorum size.
reset_quorum_size(commit_quorum);
}

// Step 1: Check if leader itself is requested to move out.
if (m_my_repl_id == member_out_uuid && m_my_repl_id == get_leader_id()) {
// If leader is the member requested to move out, then give up leadership and return error.
// Client will retry replace_member request to the new leader.
raft_server()->yield_leadership(true /* immediate */, -1 /* successor */);
RD_LOGI("Replace member leader is the member_out so yield leadership");
reset_quorum_size(0);
return make_async_error<>(ReplServiceError::NOT_LEADER);
}

// Step 2. Add the new member.
return m_msg_mgr.add_member(m_group_id, member_in_uuid)
.via(&folly::InlineExecutor::instance())
.thenValue([this, member_in_uuid, member_out_uuid](auto&& e) -> AsyncReplResult<> {
.thenValue([this, member_in_uuid, member_out_uuid, commit_quorum](auto&& e) -> AsyncReplResult<> {
// TODO Currently we ignore the cancelled, fix nuraft_mesg to not timeout
// when adding member. Member is added to cluster config until member syncs fully
// with atleast stop gap. This will take a lot of time for block or
Expand All @@ -143,6 +150,7 @@ AsyncReplResult<> RaftReplDev::replace_member(replica_id_t member_out_uuid, repl
RD_LOGW("Ignoring error returned from nuraft add_member {}", e.error());
} else {
RD_LOGE("Replace member error in add member : {}", e.error());
reset_quorum_size(0);
return make_async_error<>(RaftReplService::to_repl_error(e.error()));
}
}
Expand All @@ -165,6 +173,7 @@ AsyncReplResult<> RaftReplDev::replace_member(replica_id_t member_out_uuid, repl
auto err = m_state_machine->propose_to_raft(std::move(rreq));
if (err != ReplServiceError::OK) {
LOGERROR("Replace member propose to raft failed {}", err);
reset_quorum_size(0);
return make_async_error<>(std::move(err));
}

Expand All @@ -175,7 +184,7 @@ AsyncReplResult<> RaftReplDev::replace_member(replica_id_t member_out_uuid, repl
// entry and call exit_group() and leave().
return m_msg_mgr.rem_member(m_group_id, member_out_uuid)
.via(&folly::InlineExecutor::instance())
.thenValue([this, member_out](auto&& e) -> AsyncReplResult<> {
.thenValue([this, member_out, commit_quorum](auto&& e) -> AsyncReplResult<> {
if (e.hasError()) {
// Ignore the server not found as server removed from the cluster
// as requests are idempotent and can be resend.
Expand All @@ -185,16 +194,28 @@ AsyncReplResult<> RaftReplDev::replace_member(replica_id_t member_out_uuid, repl
// Its ok to retry this request as the request
// of replace member is idempotent.
RD_LOGE("Replace member failed to remove member : {}", e.error());
reset_quorum_size(0);
return make_async_error<>(ReplServiceError::RETRY_REQUEST);
}
} else {
RD_LOGI("Replace member removed member={} from group_id={}", member_out, group_id_str());
}

// Revert the quorum size back to 0.
reset_quorum_size(0);
return make_async_success<>();
});
});
}

void RaftReplDev::reset_quorum_size(uint32_t commit_quorum) {
RD_LOGI("Reset raft quorum size={}", commit_quorum);
nuraft::raft_params params = raft_server()->get_current_params();
params.with_custom_commit_quorum_size(commit_quorum);
params.with_custom_election_quorum_size(commit_quorum);
raft_server()->update_params(params);
}

folly::SemiFuture< ReplServiceError > RaftReplDev::destroy_group() {
// Set the intent to destroy the group
m_stage.update([](auto* stage) { *stage = repl_dev_stage_t::DESTROYING; });
Expand Down
3 changes: 2 additions & 1 deletion src/lib/replication/repl_dev/raft_repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ class RaftReplDev : public ReplDev,
virtual ~RaftReplDev() = default;

bool join_group();
AsyncReplResult<> replace_member(replica_id_t member_out, replica_id_t member_in);
AsyncReplResult<> replace_member(replica_id_t member_out, replica_id_t member_in, uint32_t commit_quorum);
folly::SemiFuture< ReplServiceError > destroy_group();

//////////////// All ReplDev overrides/implementation ///////////////////////
Expand Down Expand Up @@ -275,6 +275,7 @@ class RaftReplDev : public ReplDev,
void on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx);
void commit_blk(repl_req_ptr_t rreq);
void replace_member(repl_req_ptr_t rreq);
void reset_quorum_size(uint32_t commit_quorum);
};

} // namespace homestore
4 changes: 2 additions & 2 deletions src/lib/replication/service/generic_repl_svc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ void SoloReplService::load_repl_dev(sisl::byte_view const& buf, void* meta_cooki
}
}

AsyncReplResult<> SoloReplService::replace_member(group_id_t group_id, replica_id_t member_out,
replica_id_t member_in) const {
AsyncReplResult<> SoloReplService::replace_member(group_id_t group_id, replica_id_t member_out, replica_id_t member_in,
uint32_t commit_quorum) const {
return make_async_error<>(ReplServiceError::NOT_IMPLEMENTED);
}

Expand Down
4 changes: 2 additions & 2 deletions src/lib/replication/service/generic_repl_svc.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ class SoloReplService : public GenericReplService {
std::set< replica_id_t > const& members) override;
folly::SemiFuture< ReplServiceError > remove_repl_dev(group_id_t group_id) override;
void load_repl_dev(sisl::byte_view const& buf, void* meta_cookie) override;
AsyncReplResult<> replace_member(group_id_t group_id, replica_id_t member_out,
replica_id_t member_in) const override;
AsyncReplResult<> replace_member(group_id_t group_id, replica_id_t member_out, replica_id_t member_in,
uint32_t commit_quorum = 0) const override;
};

class SoloReplServiceCPHandler : public CPCallbacks {
Expand Down
6 changes: 3 additions & 3 deletions src/lib/replication/service/raft_repl_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -330,13 +330,13 @@ void RaftReplService::load_repl_dev(sisl::byte_view const& buf, void* meta_cooki
add_repl_dev(group_id, rdev);
}

AsyncReplResult<> RaftReplService::replace_member(group_id_t group_id, replica_id_t member_out,
replica_id_t member_in) const {
AsyncReplResult<> RaftReplService::replace_member(group_id_t group_id, replica_id_t member_out, replica_id_t member_in,
uint32_t commit_quorum) const {
auto rdev_result = get_repl_dev(group_id);
if (!rdev_result) { return make_async_error<>(ReplServiceError::SERVER_NOT_FOUND); }

return std::dynamic_pointer_cast< RaftReplDev >(rdev_result.value())
->replace_member(member_out, member_in)
->replace_member(member_out, member_in, commit_quorum)
.via(&folly::InlineExecutor::instance())
.thenValue([this](auto&& e) mutable {
if (e.hasError()) { return make_async_error<>(e.error()); }
Expand Down
4 changes: 2 additions & 2 deletions src/lib/replication/service/raft_repl_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ class RaftReplService : public GenericReplService,
std::set< replica_id_t > const& members) override;
folly::SemiFuture< ReplServiceError > remove_repl_dev(group_id_t group_id) override;
void load_repl_dev(sisl::byte_view const& buf, void* meta_cookie) override;
AsyncReplResult<> replace_member(group_id_t group_id, replica_id_t member_out,
replica_id_t member_in) const override;
AsyncReplResult<> replace_member(group_id_t group_id, replica_id_t member_out, replica_id_t member_in,
uint32_t commit_quorum = 0) const override;

private:
RaftReplDev* raft_group_config_found(sisl::byte_view const& buf, void* meta_cookie);
Expand Down
11 changes: 7 additions & 4 deletions src/tests/test_common/raft_repl_test_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ class TestReplicatedDB : public homestore::ReplDevListener {
ReplResult< blk_alloc_hints > get_blk_alloc_hints(sisl::blob const& header, uint32_t data_size) override {
return blk_alloc_hints{};
}
void replace_member(replica_id_t member_out, replica_id_t member_in) override {}
void replace_member(replica_id_t member_out, replica_id_t member_in, uint32_t commit_quorum) override {}

void on_destroy() override {
LOGINFOMOD(replication, "[Replica={}] Group={} is being destroyed", g_helper->replica_num(),
Expand Down Expand Up @@ -610,11 +610,14 @@ class RaftReplDevTestBase : public testing::Test {
void create_snapshot() { dbs_[0]->create_snapshot(); }
void truncate(int num_reserved_entries) { dbs_[0]->truncate(num_reserved_entries); }

void replace_member(std::shared_ptr< TestReplicatedDB > db, replica_id_t member_out, replica_id_t member_in) {
this->run_on_leader(db, [this, db, member_out, member_in]() {
void replace_member(std::shared_ptr< TestReplicatedDB > db, replica_id_t member_out, replica_id_t member_in,
uint32_t commit_quorum = 0) {
this->run_on_leader(db, [this, db, member_out, member_in, commit_quorum]() {
LOGINFO("Replace member out={} in={}", boost::uuids::to_string(member_out),
boost::uuids::to_string(member_in));
auto v = hs()->repl_service().replace_member(db->repl_dev()->group_id(), member_out, member_in).get();
auto v = hs()->repl_service()
.replace_member(db->repl_dev()->group_id(), member_out, member_in, commit_quorum)
.get();
ASSERT_EQ(v.hasError(), false) << "Error in replacing member";
});
}
Expand Down
68 changes: 66 additions & 2 deletions src/tests/test_raft_repl_dev_dynamic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,73 @@ TEST_F(ReplDevDynamicTest, ReplaceMember) {
LOGINFO("ReplaceMember test done");
}

TEST_F(ReplDevDynamicTest, TwoMemberDown) {
LOGINFO("TwoMemberDown test started");

// Make two members down in a group and leader cant reach a quorum.
// We set the custom quorum size to 1 and call replace member.
// Leader should do some writes to validate it has reach quorum size.
LOGINFO("Homestore replica={} setup completed", g_helper->replica_num());
auto db = dbs_.back();
auto num_replicas = SISL_OPTIONS["replicas"].as< uint32_t >();
auto num_members = SISL_OPTIONS["replicas"].as< uint32_t >() + SISL_OPTIONS["spare_replicas"].as< uint32_t >();

uint64_t num_io_entries = SISL_OPTIONS["num_io"].as< uint64_t >();

// Replace the last member in the group with index(num_replicas - 1) with a spare
// replica with index (num_replica). Member id's are 0,...,num_replicas-1, num_replicas,...,N
uint32_t member_out = num_replicas - 1;
uint32_t member_in = num_replicas;

g_helper->sync_for_test_start(num_members);

// Shutdown replica 1 and replica 2 to simulate two member down.
if (g_helper->replica_num() == 1) {
this->shutdown_replica(1);
LOGINFO("Shutdown replica 1");
}

if (g_helper->replica_num() == 2) {
this->shutdown_replica(2);
LOGINFO("Shutdown replica 2");
}

if (g_helper->replica_num() == 0) {
// Replace down replica 2 with spare replica 3 with commit quorum 1
// so that leader can go ahead with replacing member.
LOGINFO("Replace member started");
replace_member(db, g_helper->replica_id(member_out), g_helper->replica_id(member_in), 1 /* commit quorum*/);
this->write_on_leader(num_io_entries, true /* wait_for_commit */);
LOGINFO("Leader completed num_io={}", num_io_entries);
}

if (g_helper->replica_num() == member_in) {
wait_for_commits(num_io_entries);
LOGINFO("Member in got all commits");
}

if (g_helper->replica_num() == 0 || g_helper->replica_num() == member_in) {
// Validate data on leader replica 0 and replica 3
LOGINFO("Validate all data written so far by reading them replica={}", g_helper->replica_num());
this->validate_data();
}

g_helper->sync_for_cleanup_start(num_members);

if (g_helper->replica_num() == 1) {
LOGINFO("Start replica 1");
this->start_replica(1);
}
if (g_helper->replica_num() == 2) {
LOGINFO("Start replica 2");
this->start_replica(2);
}

LOGINFO("TwoMemberDown test done");
}

// TODO add more tests with leader and member restart, multiple member replace
// leader replace, commit quorum
// leader replace

int main(int argc, char* argv[]) {
int parsed_argc = argc;
Expand All @@ -89,7 +154,6 @@ int main(int argc, char* argv[]) {
// leadership_expiry time.
//
HS_SETTINGS_FACTORY().modifiable_settings([](auto& s) {
s.consensus.leadership_expiry_ms = -1; // -1 means never expires;
s.generic.repl_dev_cleanup_interval_sec = 1;

// Disable implicit flush and timer.
Expand Down
2 changes: 1 addition & 1 deletion src/tests/test_solo_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ class SoloReplDevTest : public testing::Test {
cintrusive< repl_req_ctx >& ctx) override {
LOGINFO("Received error={} on repl_dev", enum_name(error));
}
void replace_member(replica_id_t member_out, replica_id_t member_in) override {}
void replace_member(replica_id_t member_out, replica_id_t member_in, uint32_t commit_quorum) override {}
void on_destroy() override {}
};

Expand Down

0 comments on commit 4b09a0b

Please sign in to comment.