Skip to content

Commit

Permalink
Implement GC_REPL_REQ Based on DSN to Prevent Resource Leaks (#576)
Browse files Browse the repository at this point in the history
* Implement GC_REPL_REQ Based on DSN to Prevent Resource Leaks

This commit introduces a mechanism to garbage collect (GC) replication requests
(rreqs) that may hang indefinitely, thereby consuming memory and disk resources
unnecessarily. These rreqs can enter a hanging state under several
circumstances, as outlined below:

1. Scenario with Delayed Commit:
   - Follower F1 receives LSN 100 and DSN 104 from Leader L1 and takes longer
     than the raft timeout to precommit/commit it.
   - L1 resends LSN 100, causing F1 to fetch the data again. Since LSN 100 was
     committed in a previous attempt, this log entry is skipped, leaving the
     rreq hanging indefinitely.

2. Scenario with Leader Failure Before Data Completion:
   - Follower F1 receives LSN 100 from L1, but before all data is fetched/pushed,
     L1 fails and L2 becomes the new leader.
   - L2 resends LSN 100 with L2 as the new originator. F1 proceeds with the new
     rreq and commits it, but the initial rreq from L1 hangs indefinitely as it
     cannot fetch data from the new leader L2.

3. Scenario with Leader Failure After Data Write:
   - Follower F1 receives data (DSN 104) from L1 and writes it. Before the log of
     LSN 100 reaches F1, L1 fails and L2 becomes the new leader.
   - L2 resends LSN 100 to F1, and F1 fetches DSN 104 from L2, leaving the
     original rreq hanging.

This garbage collection process cleans up based on DSN. Any rreqs in
`m_repl_key_req_map`, whose DSN is already committed (`rreq->dsn <
repl_dev->m_next_dsn`), will be GC'd. This is safe on the follower side, as the
follower updates `m_next_dsn` during commit. Any DSN below `cur_dsn` should
already be committed, implying that the rreq should already be removed from
`m_repl_key_req_map`.

On the leader side, since `m_next_dsn` is updated when sending out the proposal,
it is not safe to clean up based on `m_next_dsn`. Therefore, we explicitly skip
the leader in this GC process.



Skipping localize raft logs we already committed.

Leader may send duplicate raft logs, if we localize them
unconditionally duplicate data will be written to chunk during
fetch_data.

It is safe for us to skip those logs that already committed,
there is no way those LSN can be over-written.

Signed-off-by: Xiaoxi Chen <[email protected]>
  • Loading branch information
xiaoxichen authored Nov 6, 2024
1 parent 804cd6b commit 50f42ff
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 34 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.5.6"
version = "6.5.7"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
2 changes: 1 addition & 1 deletion src/include/homestore/replication/repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost::
friend class SoloReplDev;

public:
repl_req_ctx() {}
repl_req_ctx() { m_start_time = Clock::now(); }
virtual ~repl_req_ctx();
void init(repl_key rkey, journal_type_t op_code, bool is_proposer, sisl::blob const& user_header,
sisl::blob const& key, uint32_t data_size);
Expand Down
97 changes: 71 additions & 26 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -895,7 +895,7 @@ void RaftReplDev::handle_commit(repl_req_ptr_t rreq, bool recovery) {
// Remove the request from repl_key map.
m_repl_key_req_map.erase(rreq->rkey());
// Remove the request from lsn map.
m_state_machine->unlink_lsn_to_req(rreq->lsn());
m_state_machine->unlink_lsn_to_req(rreq->lsn(), rreq);

auto cur_dsn = m_next_dsn.load(std::memory_order_relaxed);
while (cur_dsn <= rreq->dsn()) {
Expand Down Expand Up @@ -1191,9 +1191,22 @@ std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nu
entries.size());

auto reqs = sisl::VectorPool< repl_req_ptr_t >::alloc();
for (auto& entry : entries) {
auto last_commit_lsn = uint64_cast(get_last_commit_lsn());
for (unsigned long i = 0; i < entries.size(); i++) {
auto& entry = entries[i];
auto lsn = start_lsn + i;
auto term = entry->get_term();
if (entry->get_val_type() != nuraft::log_val_type::app_log) { continue; }
if (entry->get_buf_ptr()->size() == 0) { continue; }
// skipping localize for already committed log(dup), they anyway will be discard
// by nuraft before append_log.
if (lsn <= last_commit_lsn) {
RD_LOGT("Raft channel: term {}, lsn {}, skipping dup, last_commit_lsn {}", term, lsn,
last_commit_lsn);
continue;
}
// Those LSNs already in logstore but not yet committed, will be dedup here,
// applier_create_req will return same req as previous one
auto req = m_state_machine->localize_journal_entry_prepare(*entry);
if (req == nullptr) {
sisl::VectorPool< repl_req_ptr_t >::free(reqs);
Expand Down Expand Up @@ -1265,39 +1278,71 @@ cshared< ReplDevCPContext > RaftReplDev::get_cp_ctx(CP* cp) {
void RaftReplDev::cp_cleanup(CP*) {}

void RaftReplDev::gc_repl_reqs() {
std::vector< int64_t > expired_keys;
m_state_machine->iterate_repl_reqs([this, &expired_keys](auto key, auto rreq) {
auto cur_dsn = m_next_dsn.load();
if (cur_dsn != 0) cur_dsn = cur_dsn - 1;
// On follower, DSN below cur_dsn should very likely be commited.
// It is not guaranteed because DSN and LSN are generated separately,
// DSN in async_alloc_write before pushing data, LSN later when
// proposing to raft. Two simultaneous write requests on leader can have
// <LSN=100, DSN=102> and <LSN=101, DSN =101> during the window.
std::vector< repl_req_ptr_t > expired_rreqs;

auto req_map_size = m_repl_key_req_map.size();
RD_LOGI("m_repl_key_req_map size is {};", req_map_size);
for (auto [key, rreq] : m_repl_key_req_map) {
// FIXME: Skipping proposer for now, the DSN in proposer increased in proposing stage, not when commit().
// Need other mechanism.
if (rreq->is_proposer()) {
// don't clean up proposer's request
continue;
}
if (rreq->dsn() < cur_dsn && rreq->is_expired()) {
// The DSN can be out of order, wait till rreq expired.
RD_LOGD("legacy req with commited DSN, rreq=[{}] , dsn = {}, next_dsn = {}, gap= {}, elapsed_time_sec {}",
rreq->to_string(), rreq->dsn(), cur_dsn, cur_dsn - rreq->dsn(),
get_elapsed_time_sec(rreq->created_time()));
expired_rreqs.push_back(rreq);
}
}
int sm_req_cnt = 0;
// FIXME: we ensured data written before appending log to log store, in which we add rreq to state_machine
// and during pre-commit/commit we retrieve rreq from state_machine. Removing requests outside of state
// machine is risky.
// Below logs are logging only, can be removed once we get more confidence.
m_state_machine->iterate_repl_reqs([this, cur_dsn, &sm_req_cnt](auto key, auto rreq) {
sm_req_cnt++;
if (rreq->is_proposer()) {
// don't clean up proposer's request
return;
}

if (rreq->is_expired()) {
expired_keys.push_back(key);
RD_LOGD("rreq=[{}] is expired, cleaning up; elapsed_time_sec{};", rreq->to_string(),
RD_LOGD("StateMachine: rreq=[{}] is expired, elapsed_time_sec{};", rreq->to_string(),
get_elapsed_time_sec(rreq->created_time()));

// do garbage collection
// 1. free the allocated blocks
if (rreq->has_state(repl_req_state_t::BLK_ALLOCATED)) {
auto blkid = rreq->local_blkid();
data_service().async_free_blk(blkid).thenValue([this, blkid](auto&& err) {
HS_LOG_ASSERT(!err, "freeing blkid={} upon error failed, potential to cause blk leak",
blkid.to_string());
RD_LOGD("blkid={} freed successfully", blkid.to_string());
});
}

// 2. remove from the m_repl_key_req_map
// handle_error during fetch data response might have already removed the rreq from the this map
if (m_repl_key_req_map.find(rreq->rkey()) != m_repl_key_req_map.end()) {
m_repl_key_req_map.erase(rreq->rkey());
}
}
});
RD_LOGI("state_machine req map size is {};", sm_req_cnt);

for (auto const& l : expired_keys) {
m_state_machine->unlink_lsn_to_req(l);
for (auto removing_rreq : expired_rreqs) {
// once log flushed, the commit progress controlled by raft
if (removing_rreq->has_state(repl_req_state_t::LOG_FLUSHED)) {
RD_LOGI("Skipping GC rreq [{}] because it is in state machine", removing_rreq->to_string());
continue;
}
// do garbage collection
// 1. free the allocated blocks
RD_LOGI("Removing rreq [{}]", removing_rreq->to_string());
if (removing_rreq->has_state(repl_req_state_t::BLK_ALLOCATED)) {
auto blkid = removing_rreq->local_blkid();
data_service().async_free_blk(blkid).thenValue([this, blkid](auto&& err) {
HS_LOG_ASSERT(!err, "freeing blkid={} upon error failed, potential to cause blk leak",
blkid.to_string());
RD_LOGD("GC rreq: Releasing blkid={} freed successfully", blkid.to_string());
});
}
// 2. remove from the m_repl_key_req_map
if (m_repl_key_req_map.find(removing_rreq->rkey()) != m_repl_key_req_map.end()) {
m_repl_key_req_map.erase(removing_rreq->rkey());
}
}
}

Expand Down
11 changes: 6 additions & 5 deletions src/lib/replication/repl_dev/raft_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,11 +221,12 @@ uint64_t RaftStateMachine::last_commit_index() {

void RaftStateMachine::become_ready() { m_rd.become_ready(); }

void RaftStateMachine::unlink_lsn_to_req(int64_t lsn) {
auto const it = m_lsn_req_map.find(lsn);
if (it != m_lsn_req_map.cend()) {
RD_LOG(DEBUG, "Raft channel: erase lsn {}, rreq {}", lsn, it->second->to_string());
m_lsn_req_map.erase(lsn);
void RaftStateMachine::unlink_lsn_to_req(int64_t lsn, repl_req_ptr_t rreq) {
// it is possible a LSN mapped to different rreq in history
// due to log overwritten. Verify the rreq before removing
auto deleted = m_lsn_req_map.erase_if_equal(lsn, rreq);
if (deleted) {
RD_LOG(DEBUG, "Raft channel: erase lsn {}, rreq {}", lsn, rreq->to_string());
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/lib/replication/repl_dev/raft_state_machine.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class RaftStateMachine : public nuraft::state_machine {
repl_req_ptr_t localize_journal_entry_prepare(nuraft::log_entry& lentry);
repl_req_ptr_t localize_journal_entry_finish(nuraft::log_entry& lentry);
void link_lsn_to_req(repl_req_ptr_t rreq, int64_t lsn);
void unlink_lsn_to_req(int64_t lsn);
void unlink_lsn_to_req(int64_t lsn, repl_req_ptr_t rreq);
repl_req_ptr_t lsn_to_req(int64_t lsn);
nuraft_mesg::repl_service_ctx* group_msg_service();

Expand Down

0 comments on commit 50f42ff

Please sign in to comment.