Skip to content

Commit

Permalink
Ensure Consistent LSN Before Opening for Traffic in Raft Group
Browse files Browse the repository at this point in the history
We identified a gap when majority members in a Raft group are down. To save IO
operations, we do not persist the last_commit_idx for every commit but instead
at regular intervals. Consequently, upon reboot, we may not reflect the latest
commit, leaving some logs in the state machine waiting for re-commitment.

For instance, if we committed up to LSN 103 but only persisted up to LSN 100,
then LSNs 100-103 will remain in the log-store, awaiting re-commitment from the
leader. If all members restart after a disaster, they face the following state:

- [S1]: commit_idx 100, last_log {idx = 105, term = 1}
- S2: commit_idx 100, last_log {idx = 103, term = 1}
- S3: commit_idx 100, last_log {idx = 103, term = 1}

If S1 opens for traffic at this point, previously committed LSN 102 might
return NOT_FOUND to clients due to the uncommitted state.

Proposed Solution:
- Mark last_log_idx as `traffic_ready_lsn` in the BECOME_LEADER callback. In the
  example above, it is 105 if S1 becomes the leader.
- The leader will not accept IO until it commits up to this `consistent_lsn`
  (105), ensuring correctness by over-committing.
- The HO will call `repl_dev->is_ready_for_traffic()` for each IO.
- On followers, the traffic_ready_lsn is zero so it allows all.
- On the leader, all requests are rejected until it commits to the
  `traffic_ready_lsn` (105 in this example).

Signed-off-by: Xiaoxi Chen <[email protected]>
  • Loading branch information
xiaoxichen committed Dec 19, 2024
1 parent 707c111 commit 0de97cd
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 15 deletions.
4 changes: 2 additions & 2 deletions conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.5.27"
version = "6.5.28"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down Expand Up @@ -54,7 +54,7 @@ def build_requirements(self):
def requirements(self):
self.requires("iomgr/[^11.3]@oss/master", transitive_headers=True)
self.requires("sisl/[^12.2]@oss/master", transitive_headers=True)
self.requires("nuraft_mesg/[^3.7]@oss/main", transitive_headers=True)
self.requires("nuraft_mesg/[^3.7.1]@oss/main", transitive_headers=True)

self.requires("farmhash/cci.20190513@", transitive_headers=True)
if self.settings.arch in ['x86', 'x86_64']:
Expand Down
9 changes: 7 additions & 2 deletions src/include/homestore/replication/repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,9 @@ struct snapshot_obj {
bool is_last_obj{false};
};

//HomeStore has some meta information to be transmitted during the baseline resync,
//Although now only dsn needs to be synced, this structure is defined as a general message, and we can easily add data if needed in the future.
// HomeStore has some meta information to be transmitted during the baseline resync,
// Although now only dsn needs to be synced, this structure is defined as a general message, and we can easily add data
// if needed in the future.
struct snp_repl_dev_data {
uint64_t magic_num{HOMESTORE_RESYNC_DATA_MAGIC};
uint32_t protocol_version{HOMESTORE_RESYNC_DATA_PROTOCOL_VERSION_V1};
Expand Down Expand Up @@ -468,6 +469,10 @@ class ReplDev {
/// @return last_commit_lsn
virtual repl_lsn_t get_last_commit_lsn() const = 0;

/// @brief if this replica is ready for accepting client IO.
/// @return true if ready, false otherwise
virtual bool is_ready_for_traffic() const = 0;

virtual void attach_listener(shared< ReplDevListener > listener) { m_listener = std::move(listener); }

virtual void detach_listener() {
Expand Down
3 changes: 1 addition & 2 deletions src/lib/replication/repl_dev/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,7 @@ void repl_req_ctx::release_data() {
m_buf_for_unaligned_data = sisl::io_blob_safe{};
if (m_pushed_data) {
LOGTRACEMOD(replication, "m_pushed_data addr={}, m_rkey={}, m_lsn={}",
static_cast<void *>(m_pushed_data.get()),
m_rkey.to_string(), m_lsn);
static_cast< void* >(m_pushed_data.get()), m_rkey.to_string(), m_lsn);
m_pushed_data->send_response();
m_pushed_data = nullptr;
}
Expand Down
23 changes: 18 additions & 5 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1237,8 +1237,7 @@ void RaftReplDev::leave() {
m_destroy_promise.setValue(ReplServiceError::OK); // In case proposer is waiting for the destroy to complete
}

std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nuraft::cb_func::Type type,
nuraft::cb_func::Param* param) {
nuraft::cb_func::ReturnCode RaftReplDev::raft_event(nuraft::cb_func::Type type, nuraft::cb_func::Param* param) {
auto ret = nuraft::cb_func::ReturnCode::Ok;

switch (type) {
Expand Down Expand Up @@ -1283,7 +1282,7 @@ std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nu
// If there is nothing we can accept(i==0), that maens we are waiting for commit
// of previous lsn, set it to 1 in this case.
m_state_machine->reset_next_batch_size_hint(std::max(1ul, i));
return {true, nuraft::cb_func::ReturnCode::ReturnNull};
return nuraft::cb_func::ReturnCode::ReturnNull;
}
reqs->emplace_back(std::move(req));
}
Expand All @@ -1298,7 +1297,21 @@ std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nu
sisl::VectorPool< repl_req_ptr_t >::free(reqs);
}
if (ret == nuraft::cb_func::ReturnCode::Ok) { m_state_machine->inc_next_batch_size_hint(); }
return {true, ret};
return ret;
}
case nuraft::cb_func::Type::JoinedCluster:
RD_LOGD("Raft channel: Received JoinedCluster, implies become_follower");
become_follower_cb();
return nuraft::cb_func::ReturnCode::Ok;
case nuraft::cb_func::Type::BecomeFollower: {
RD_LOGD("Raft channel: Received BecomeFollower");
become_follower_cb();
return nuraft::cb_func::ReturnCode::Ok;
}
case nuraft::cb_func::Type::BecomeLeader: {
RD_LOGD("Raft channel: Received BecomeLeader");
become_leader_cb();
return nuraft::cb_func::ReturnCode::Ok;
}

// RemovedFromCluster will be handled in nuraft_mesg::generic_raft_event_handler where leave() is called
Expand All @@ -1307,7 +1320,7 @@ std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nu
default:
break;
}
return {false, ret};
return nuraft::cb_func::ReturnCode::Ok;
}

void RaftReplDev::flush_durable_commit_lsn() {
Expand Down
29 changes: 27 additions & 2 deletions src/lib/replication/repl_dev/raft_repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ class RaftReplDev : public ReplDev,

std::atomic< repl_lsn_t > m_commit_upto_lsn{0}; // LSN which was lastly written, to track flushes
std::atomic< repl_lsn_t > m_compact_lsn{0}; // LSN upto which it was compacted, it is used to track where to
// The `traffic_ready_lsn` variable holds the Log Sequence Number (LSN) up to which
// the state machine should committed to before accepting traffic. This threshold ensures that
// all potential committed log be committed before handling incoming requests.
std::atomic< repl_lsn_t > m_traffic_ready_lsn{0};

std::mutex m_sb_mtx; // Lock to protect the repl dev superblock

Expand Down Expand Up @@ -187,6 +191,13 @@ class RaftReplDev : public ReplDev,
bool is_destroy_pending() const;
bool is_destroyed() const;
Clock::time_point destroyed_time() const { return m_destroyed_time; }
bool is_ready_for_traffic() const {
auto committed_lsn = m_commit_upto_lsn.load();
auto gate = m_traffic_ready_lsn.load();
bool ready = committed_lsn >= gate;
if (!ready) { RD_LOGD("Not yet ready for traffic, committed to {} but gate is {}", committed_lsn, gate); }
return ready;
}

//////////////// Accessor/shortcut methods ///////////////////////
nuraft_mesg::repl_service_ctx* group_msg_service();
Expand All @@ -206,6 +217,20 @@ class RaftReplDev : public ReplDev,
cshared< ReplDevCPContext > get_cp_ctx(CP* cp);
void cp_cleanup(CP* cp);
void become_ready();
void become_leader_cb() {
auto new_gate = raft_server()->get_last_log_idx();
repl_lsn_t existing_gate = 0;
if (!m_traffic_ready_lsn.compare_exchange_strong(existing_gate, new_gate)) {
// was a follower, m_traffic_ready_lsn should be zero on follower.
RD_REL_ASSERT(existing_gate == 0, "existing gate should be zero");
}
RD_LOGD("become_leader_cb: setting traffic_ready_lsn from {} to {}", existing_gate, new_gate);
};
void become_follower_cb() {
// m_traffic_ready_lsn should be zero on follower.
m_traffic_ready_lsn.store(0);
RD_LOGD("become_follower_cb setting traffic_ready_lsn to 0");
}

/// @brief This method is called when the data journal is compacted
///
Expand Down Expand Up @@ -270,8 +295,8 @@ class RaftReplDev : public ReplDev,
std::shared_ptr< nuraft::state_machine > get_state_machine() override;
void permanent_destroy() override;
void leave() override;
std::pair< bool, nuraft::cb_func::ReturnCode > handle_raft_event(nuraft::cb_func::Type,
nuraft::cb_func::Param*) override;

nuraft::cb_func::ReturnCode raft_event(nuraft::cb_func::Type, nuraft::cb_func::Param*) override;

private:
shared< nuraft::log_store > data_journal() { return m_data_journal; }
Expand Down
2 changes: 1 addition & 1 deletion src/lib/replication/repl_dev/raft_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ bool RaftStateMachine::apply_snapshot(nuraft::snapshot& s) {
m_rd.m_data_journal->set_last_durable_lsn(s.get_last_log_idx());
auto snp_ctx = std::make_shared< nuraft_snapshot_context >(s);
auto res = m_rd.m_listener->apply_snapshot(snp_ctx);
//make sure the changes are flushed.
// make sure the changes are flushed.
hs()->cp_mgr().trigger_cp_flush(true /* force */).get();
return res;
}
Expand Down
1 change: 1 addition & 0 deletions src/lib/replication/repl_dev/solo_repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class SoloReplDev : public ReplDev {
std::vector< peer_info > get_replication_status() const override {
return std::vector< peer_info >{peer_info{.id_ = m_group_id, .replication_idx_ = 0, .last_succ_resp_us_ = 0}};
}
bool is_ready_for_traffic() const override { return true; }

uuid_t group_id() const override { return m_group_id; }

Expand Down
11 changes: 10 additions & 1 deletion src/tests/test_common/raft_repl_test_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,10 @@ class TestReplicatedDB : public homestore::ReplDevListener {

void validate_db_data() {
g_helper->runner().set_num_tasks(inmem_db_.size());
while (!repl_dev()->is_ready_for_traffic()) {
LOGINFO("not yet ready for traffic, waiting");
std::this_thread::sleep_for(std::chrono::milliseconds{500});
}

LOGINFOMOD(replication, "[{}]: Total {} keys committed, validating them",
boost::uuids::to_string(repl_dev()->group_id()), inmem_db_.size());
Expand Down Expand Up @@ -554,14 +558,19 @@ class RaftReplDevTestBase : public testing::Test {
if (dbs_[0]->repl_dev() == nullptr) return;

do {
auto leader_uuid = dbs_[0]->repl_dev()->get_leader_id();
auto repl_dev = dbs_[0]->repl_dev();
auto leader_uuid = repl_dev->get_leader_id();

if (leader_uuid.is_nil()) {
LOGINFO("Waiting for leader to be elected");
std::this_thread::sleep_for(std::chrono::milliseconds{500});
} else if (leader_uuid == g_helper->my_replica_id()) {
LOGINFO("Writing {} entries since I am the leader my_uuid={}", num_entries,
boost::uuids::to_string(g_helper->my_replica_id()));
if (!repl_dev->is_ready_for_traffic()) {
LOGINFO("leader is not yet ready for traffic, waiting");
std::this_thread::sleep_for(std::chrono::milliseconds{500});
}
auto const block_size = SISL_OPTIONS["block_size"].as< uint32_t >();
g_helper->runner().set_num_tasks(num_entries);

Expand Down

0 comments on commit 0de97cd

Please sign in to comment.