Skip to content

Commit

Permalink
Add support for raft repl dev replace member.
Browse files Browse the repository at this point in the history
When replacing a member, add the new member, sync raft log
for replace and finally remove the old member. Once we add
new member, baseline or incremental resync will start.
Remove the old member will cause nuraft mesg to exit
the group and we periodically gc the destroyed group.
Made the repl dev base test common so that both tests files
can use. Tests by default create repl group with num_replica's.
Dynamic tests create additional spare replica's which can be
added to the test dynamically by calling replace member.
  • Loading branch information
sanebay committed Sep 19, 2024
1 parent 333d05b commit e98bb96
Show file tree
Hide file tree
Showing 14 changed files with 913 additions and 629 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.4.61"
version = "6.4.62"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
19 changes: 10 additions & 9 deletions src/include/homestore/replication/repl_decls.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,18 @@ namespace homestore {
VENUM(ReplServiceError, int32_t,
OK = 0, // Everything OK
CANCELLED = -1, // Request was cancelled
TIMEOUT = -2,
NOT_LEADER = -3,
BAD_REQUEST = -4,
SERVER_ALREADY_EXISTS = -5,
TIMEOUT = -2,
NOT_LEADER = -3,
BAD_REQUEST = -4,
SERVER_ALREADY_EXISTS = -5,
CONFIG_CHANGING = -6,
SERVER_IS_JOINING = -7,
SERVER_NOT_FOUND = -8,
CANNOT_REMOVE_LEADER = -9,
SERVER_IS_JOINING = -7,
SERVER_NOT_FOUND = -8,
CANNOT_REMOVE_LEADER = -9,
SERVER_IS_LEAVING = -10,
TERM_MISMATCH = -11,
RESULT_NOT_EXIST_YET = -10000,
TERM_MISMATCH = -11,
RETRY_REQUEST = -12,
RESULT_NOT_EXIST_YET = -10000,
NOT_IMPLEMENTED = -10001,
NO_SPACE_LEFT = -20000,
DRIVE_WRITE_ERROR = -20001,
Expand Down
6 changes: 5 additions & 1 deletion src/include/homestore/replication/repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ VENUM(repl_req_state_t, uint32_t,
VENUM(journal_type_t, uint16_t,
HS_DATA_LINKED = 0, // Linked data where each entry will store physical blkid where data reside
HS_DATA_INLINED = 1, // Data is inlined in the header of journal entry
HS_CTRL_DESTROY = 2 // Control message to destroy the repl_dev
HS_CTRL_DESTROY = 2, // Control message to destroy the repl_dev
HS_CTRL_REPLACE = 3, // Control message to replace a member
)

struct repl_key {
Expand Down Expand Up @@ -346,6 +347,9 @@ class ReplDevListener {
/// after restart in case crash happened during the destroy.
virtual void on_destroy() = 0;

/// @brief Called when replace member is performed.
virtual void replace_member(replica_id_t member_out, replica_id_t member_in) = 0;

/// @brief Called when the snapshot is being created by nuraft
virtual AsyncReplResult<> create_snapshot(shared< snapshot_context > context) = 0;

Expand Down
3 changes: 2 additions & 1 deletion src/lib/replication/repl_dev/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,10 @@ std::string repl_req_ctx::to_string() const {
}

std::string repl_req_ctx::to_compact_string() const {
if (m_op_code == journal_type_t::HS_CTRL_DESTROY) {
if (m_op_code == journal_type_t::HS_CTRL_DESTROY || m_op_code == journal_type_t::HS_CTRL_REPLACE) {
return fmt::format("term={} lsn={} op={}", m_rkey.term, m_lsn, enum_name(m_op_code));
}

return fmt::format("dsn={} term={} lsn={} op={} local_blkid={} state=[{}]", m_rkey.dsn, m_rkey.term, m_lsn,
enum_name(m_op_code), m_local_blkid.to_string(), req_state_name(uint32_cast(state())));
}
Expand Down
94 changes: 88 additions & 6 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,79 @@ bool RaftReplDev::join_group() {
m_msg_mgr.join_group(m_group_id, "homestore_replication",
std::dynamic_pointer_cast< nuraft_mesg::mesg_state_mgr >(shared_from_this()));
if (!raft_result) {
HS_DBG_ASSERT(false, "Unable to join the group_id={} with error={}", boost::uuids::to_string(m_group_id),
raft_result.error());
HS_DBG_ASSERT(false, "Unable to join the group_id={} with error={}", group_id_str(), raft_result.error());
return false;
}
return true;
}

AsyncReplResult<> RaftReplDev::replace_member(replica_id_t member_out_uuid, replica_id_t member_in_uuid) {
LOGINFO("Replace member group_id={} member_out={} member_in={}", group_id_str(),
boost::uuids::to_string(member_out_uuid), boost::uuids::to_string(member_in_uuid));

// Step 1: Check if leader itself is requested to move out.
if (m_my_repl_id == member_out_uuid && m_my_repl_id == get_leader_id()) {
// If leader is the member requested to move out, then give up leadership and return error.
// Client will retry replace_member request to the new leader.
raft_server()->yield_leadership(true /* immediate */, -1 /* successor */);
LOGINFO("Replace member leader is the member_out so yield leadership");
return make_async_error<>(ReplServiceError::NOT_LEADER);
}

// Step 2. Add the new member.
return m_msg_mgr.add_member(m_group_id, member_in_uuid)
.via(&folly::InlineExecutor::instance())
.thenValue([this, member_in_uuid, member_out_uuid](auto&& e) -> AsyncReplResult<> {
// TODO Currently we ignore the cancelled, fix nuraft_mesg to not timeout
// when adding member. Member is added to cluster config until member syncs fully
// with atleast stop gap. This will take a lot of time for block or
// object storage.
if (e.hasError() && e.error() != nuraft::cmd_result_code::CANCELLED) {
RD_LOGE("Replace member error in add member : {}", e.error());
return make_async_error<>(RaftReplService::to_repl_error(e.error()));
}
auto member_out = boost::uuids::to_string(member_out_uuid);
auto member_in = boost::uuids::to_string(member_in_uuid);

LOGINFO("Replace member added member={} to group_id={}", member_in, group_id_str());

// Step 3. Append log entry to mark the old member is out and new member is added.
auto rreq = repl_req_ptr_t(new repl_req_ctx{});
replace_members_ctx members;
std::copy(member_in_uuid.begin(), member_in_uuid.end(), members.in.begin());
std::copy(member_out_uuid.begin(), member_out_uuid.end(), members.out.begin());
sisl::blob header(r_cast< uint8_t* >(&members), members.in.size() + members.out.size());
rreq->init(
repl_key{.server_id = server_id(), .term = raft_server()->get_term(), .dsn = m_next_dsn.fetch_add(1)},
journal_type_t::HS_CTRL_REPLACE, true, header, sisl::blob{}, 0);

auto err = m_state_machine->propose_to_raft(std::move(rreq));
if (err != ReplServiceError::OK) {
LOGERROR("Replace member propose to raft failed {}", err);
return make_async_error<>(std::move(err));
}

LOGINFO("Replace member proposed to raft group_id={}", group_id_str());

// Step 4. Remove the old member. Even if the old member is temporarily
// down and recovers, nuraft mesg see member remove from cluster log
// entry and call exit_group() and leave().
return m_msg_mgr.rem_member(m_group_id, member_out_uuid)
.via(&folly::InlineExecutor::instance())
.thenValue([this, member_out](auto&& e) -> AsyncReplResult<> {
if (e.hasError()) {
// Its ok to retry this request as the request
// of replace member is idempotent.
RD_LOGE("Replace member failed to remove member : {}", e.error());
return make_async_error<>(ReplServiceError::RETRY_REQUEST);
} else {
LOGINFO("Replace member removed member={} from group_id={}", member_out, group_id_str());
}
return make_async_success<>();
});
});
}

folly::SemiFuture< ReplServiceError > RaftReplDev::destroy_group() {
// Set the intent to destroy the group
m_stage.update([](auto* stage) { *stage = repl_dev_stage_t::DESTROYING; });
Expand Down Expand Up @@ -141,7 +207,7 @@ folly::SemiFuture< ReplServiceError > RaftReplDev::destroy_group() {
LOGERROR("RaftReplDev::destroy_group failed {}", err);
}

LOGINFO("Raft repl dev destroy_group={}", boost::uuids::to_string(m_group_id));
LOGINFO("Raft repl dev destroy_group={}", group_id_str());
return m_destroy_promise.getSemiFuture();
}

Expand Down Expand Up @@ -786,6 +852,8 @@ void RaftReplDev::handle_commit(repl_req_ptr_t rreq, bool recovery) {
RD_LOGD("Raft channel: Commit rreq=[{}]", rreq->to_string());
if (rreq->op_code() == journal_type_t::HS_CTRL_DESTROY) {
leave();
} else if (rreq->op_code() == journal_type_t::HS_CTRL_REPLACE) {
replace_member(rreq);
} else {
m_listener->on_commit(rreq->lsn(), rreq->header(), rreq->key(), rreq->local_blkid(), rreq);
}
Expand Down Expand Up @@ -820,7 +888,8 @@ void RaftReplDev::handle_error(repl_req_ptr_t const& rreq, ReplServiceError err)
blkid.to_string());
});
}
} else if (rreq->op_code() == journal_type_t::HS_CTRL_DESTROY) {
} else if (rreq->op_code() == journal_type_t::HS_CTRL_DESTROY ||
rreq->op_code() == journal_type_t::HS_CTRL_REPLACE) {
if (rreq->is_proposer()) { m_destroy_promise.setValue(err); }
}

Expand All @@ -836,6 +905,17 @@ void RaftReplDev::handle_error(repl_req_ptr_t const& rreq, ReplServiceError err)
rreq->clear();
}

void RaftReplDev::replace_member(repl_req_ptr_t rreq) {
auto members = r_cast< const replace_members_ctx* >(rreq->header().cbytes());
replica_id_t member_in, member_out;
std::copy(members->out.begin(), members->out.end(), member_out.begin());
std::copy(members->in.begin(), members->in.end(), member_in.begin());
RD_LOGI("Raft repl replace_member member_out={} member_in={}", boost::uuids::to_string(member_out),
boost::uuids::to_string(member_in));

m_listener->replace_member(member_out, member_in);
}

static bool blob_equals(sisl::blob const& a, sisl::blob const& b) {
if (a.size() != b.size()) { return false; }
return (std::memcmp(a.cbytes(), b.cbytes(), a.size()) == 0);
Expand Down Expand Up @@ -971,12 +1051,14 @@ void RaftReplDev::save_config(const nuraft::cluster_config& config) {
std::unique_lock lg{m_config_mtx};
(*m_raft_config_sb)["config"] = serialize_cluster_config(config);
m_raft_config_sb.write();
RD_LOGI("Saved config {}", (*m_raft_config_sb)["config"].dump());
}

void RaftReplDev::save_state(const nuraft::srv_state& state) {
std::unique_lock lg{m_config_mtx};
(*m_raft_config_sb)["state"] = nlohmann::json{{"term", state.get_term()}, {"voted_for", state.get_voted_for()}};
m_raft_config_sb.write();
RD_LOGI("Saved state {}", (*m_raft_config_sb)["state"].dump());
}

nuraft::ptr< nuraft::srv_state > RaftReplDev::read_state() {
Expand Down Expand Up @@ -1013,7 +1095,7 @@ uint32_t RaftReplDev::get_logstore_id() const { return m_data_journal->logstore_
std::shared_ptr< nuraft::state_machine > RaftReplDev::get_state_machine() { return m_state_machine; }

void RaftReplDev::permanent_destroy() {
RD_LOGI("Permanent destroy for raft repl dev");
RD_LOGI("Permanent destroy for raft repl dev group_id={}", group_id_str());
m_rd_sb.destroy();
m_raft_config_sb.destroy();
m_data_journal->remove_store();
Expand All @@ -1035,7 +1117,7 @@ void RaftReplDev::leave() {
m_rd_sb->destroy_pending = 0x1;
m_rd_sb.write();

RD_LOGI("RaftReplDev leave group");
RD_LOGI("RaftReplDev leave group_id={}", group_id_str());
m_destroy_promise.setValue(ReplServiceError::OK); // In case proposer is waiting for the destroy to complete
}

Expand Down
7 changes: 7 additions & 0 deletions src/lib/replication/repl_dev/raft_repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ using raft_cluster_config_ptr_t = nuraft::ptr< nuraft::cluster_config >;

ENUM(repl_dev_stage_t, uint8_t, INIT, ACTIVE, DESTROYING, DESTROYED, PERMANENT_DESTROYED);

struct replace_members_ctx {
std::array< uint8_t, 16 > out;
std::array< uint8_t, 16 > in;
};

class RaftReplDevMetrics : public sisl::MetricsGroup {
public:
explicit RaftReplDevMetrics(const char* inst_name) : sisl::MetricsGroup("RaftReplDev", inst_name) {
Expand Down Expand Up @@ -150,6 +155,7 @@ class RaftReplDev : public ReplDev,
virtual ~RaftReplDev() = default;

bool join_group();
AsyncReplResult<> replace_member(replica_id_t member_out, replica_id_t member_in);
folly::SemiFuture< ReplServiceError > destroy_group();

//////////////// All ReplDev overrides/implementation ///////////////////////
Expand Down Expand Up @@ -268,6 +274,7 @@ class RaftReplDev : public ReplDev,
bool wait_for_data_receive(std::vector< repl_req_ptr_t > const& rreqs, uint64_t timeout_ms);
void on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx);
void commit_blk(repl_req_ptr_t rreq);
void replace_member(repl_req_ptr_t rreq);
};

} // namespace homestore
14 changes: 13 additions & 1 deletion src/lib/replication/service/raft_repl_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ void RaftReplService::start() {
.with_hb_interval(HS_DYNAMIC_CONFIG(consensus.heartbeat_period_ms))
.with_max_append_size(HS_DYNAMIC_CONFIG(consensus.max_append_batch_size))
.with_log_sync_batch_size(HS_DYNAMIC_CONFIG(consensus.log_sync_batch_size))
// TODO fix the log_gap thresholds when adding new member.
#if 0
.with_log_sync_stopping_gap(HS_DYNAMIC_CONFIG(consensus.min_log_gap_to_join))
#endif
.with_stale_log_gap(HS_DYNAMIC_CONFIG(consensus.stale_log_gap_hi_threshold))
.with_fresh_log_gap(HS_DYNAMIC_CONFIG(consensus.stale_log_gap_lo_threshold))
.with_snapshot_enabled(HS_DYNAMIC_CONFIG(consensus.snapshot_freq_distance))
Expand Down Expand Up @@ -327,7 +330,16 @@ void RaftReplService::load_repl_dev(sisl::byte_view const& buf, void* meta_cooki

AsyncReplResult<> RaftReplService::replace_member(group_id_t group_id, replica_id_t member_out,
replica_id_t member_in) const {
return make_async_error<>(ReplServiceError::NOT_IMPLEMENTED);
auto rdev_result = get_repl_dev(group_id);
if (!rdev_result) { return make_async_error<>(ReplServiceError::SERVER_NOT_FOUND); }

return std::dynamic_pointer_cast< RaftReplDev >(rdev_result.value())
->replace_member(member_out, member_in)
.via(&folly::InlineExecutor::instance())
.thenValue([this](auto&& e) mutable {
if (e.hasError()) { return make_async_error<>(e.error()); }
return make_async_success<>();
});
}

////////////////////// Reaper Thread related //////////////////////////////////
Expand Down
1 change: 0 additions & 1 deletion src/lib/replication/service/raft_repl_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ class RaftReplService : public GenericReplService,
void gc_repl_devs();
void gc_repl_reqs();
void flush_durable_commit_lsn();

};

class RaftReplServiceCPHandler : public CPCallbacks {
Expand Down
6 changes: 6 additions & 0 deletions src/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ if (${io_tests})
target_sources(test_raft_repl_dev PRIVATE test_raft_repl_dev.cpp)
target_link_libraries(test_raft_repl_dev homestore ${COMMON_TEST_DEPS} GTest::gmock)

add_executable(test_raft_repl_dev_dynamic)
target_sources(test_raft_repl_dev_dynamic PRIVATE test_raft_repl_dev_dynamic.cpp)
target_link_libraries(test_raft_repl_dev_dynamic homestore ${COMMON_TEST_DEPS} GTest::gmock)

can_build_epoll_io_tests(epoll_tests)
if(${epoll_tests})
add_test(NAME LogDev-Epoll COMMAND test_log_dev)
Expand All @@ -126,6 +130,7 @@ if (${io_tests})
add_test(NAME MetaBlkMgr-Epoll COMMAND test_meta_blk_mgr)
add_test(NAME DataService-Epoll COMMAND test_data_service)
add_test(NAME RaftReplDev-Epoll COMMAND test_raft_repl_dev)
add_test(NAME RaftReplDevDynamic-Epoll COMMAND test_raft_repl_dev_dynamic)
# add_test(NAME SoloReplDev-Epoll COMMAND test_solo_repl_dev)
endif()

Expand All @@ -138,6 +143,7 @@ if (${io_tests})
add_test(NAME SoloReplDev-Spdk COMMAND test_solo_repl_dev -- --spdk "true")
add_test(NAME HomeRaftLogStore-Spdk COMMAND test_home_raft_logstore -- --spdk "true")
add_test(NAME RaftReplDev-Spdk COMMAND test_raft_repl_dev -- --spdk "true")
add_test(NAME RaftReplDevDynamic-Spdk COMMAND test_raft_repl_dev_dynamic -- --spdk "true")
if(${epoll_tests})
SET_TESTS_PROPERTIES(MetaBlkMgr-Spdk PROPERTIES DEPENDS LogStore-Spdk)
SET_TESTS_PROPERTIES(DataService-Spdk PROPERTIES DEPENDS MetaBlkMgr-Spdk)
Expand Down
19 changes: 14 additions & 5 deletions src/tests/test_common/hs_repl_test_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
SISL_OPTION_GROUP(test_repl_common_setup,
(replicas, "", "replicas", "Total number of replicas",
::cxxopts::value< uint32_t >()->default_value("3"), "number"),
(spare_replicas, "", "spare_replicas", "Additional number of spare replicas not part of repldev",
::cxxopts::value< uint32_t >()->default_value("1"), "number"),
(base_port, "", "base_port", "Port number of first replica",
::cxxopts::value< uint16_t >()->default_value("4000"), "number"),
(replica_num, "", "replica_num",
Expand Down Expand Up @@ -134,11 +136,12 @@ class HSReplTestHelper : public HSTestHelper {
HSReplTestHelper(std::string const& name, std::vector< std::string > const& args, char** argv) :
name_{name}, args_{args}, argv_{argv} {}

void setup() {
void setup(uint32_t num_replicas) {
num_replicas_ = num_replicas;
replica_num_ = SISL_OPTIONS["replica_num"].as< uint16_t >();

sisl::logging::SetLogger(name_ + std::string("_replica_") + std::to_string(replica_num_));
sisl::logging::SetLogPattern("[%D %T%z] [%^%L%$] [%n] [%t] %v");
auto const num_replicas = SISL_OPTIONS["replicas"].as< uint32_t >();

boost::uuids::string_generator gen;
for (uint32_t i{0}; i < num_replicas; ++i) {
Expand Down Expand Up @@ -226,7 +229,7 @@ class HSReplTestHelper : public HSTestHelper {

void reset_setup() {
teardown();
setup();
setup(num_replicas_);
}

void restart(uint32_t shutdown_delay_secs = 5u) {
Expand Down Expand Up @@ -273,8 +276,12 @@ class HSReplTestHelper : public HSTestHelper {

if (replica_num_ == 0) {
std::set< homestore::replica_id_t > members;
std::transform(members_.begin(), members_.end(), std::inserter(members, members.end()),
[](auto const& p) { return p.first; });
// By default we create repl dev with number of members equal to replicas argument.
// We dont add spare replica's to the group by default.
for (auto& m : members_) {
if (m.second < SISL_OPTIONS["replicas"].as< uint32_t >()) { members.insert(m.first); }
}

group_id_t repl_group_id = hs_utils::gen_random_uuid();
{
std::unique_lock lg(groups_mtx_);
Expand All @@ -299,6 +306,7 @@ class HSReplTestHelper : public HSTestHelper {
auto listener = std::move(pending_listeners_[0]);
repl_groups_.insert(std::pair(group_id, listener));
pending_listeners_.erase(pending_listeners_.begin());
LOGINFO("Got listener for group_id={} replica={}", boost::uuids::to_string(group_id), replica_num_);
return listener;
}

Expand Down Expand Up @@ -346,6 +354,7 @@ class HSReplTestHelper : public HSTestHelper {
std::string name_;
std::vector< std::string > args_;
char** argv_;
uint32_t num_replicas_;

std::vector< homestore::dev_info > dev_list_;

Expand Down
Loading

0 comments on commit e98bb96

Please sign in to comment.