Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement GC_REPL_REQ Based on DSN to Prevent Resource Leaks #576

Merged
merged 11 commits into from
Nov 6, 2024
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.5.6"
version = "6.5.7"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
2 changes: 1 addition & 1 deletion src/include/homestore/replication/repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost::
friend class SoloReplDev;

public:
repl_req_ctx() {}
repl_req_ctx() { m_start_time = Clock::now(); }
virtual ~repl_req_ctx();
void init(repl_key rkey, journal_type_t op_code, bool is_proposer, sisl::blob const& user_header,
sisl::blob const& key, uint32_t data_size);
Expand Down
97 changes: 71 additions & 26 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -895,7 +895,7 @@ void RaftReplDev::handle_commit(repl_req_ptr_t rreq, bool recovery) {
// Remove the request from repl_key map.
m_repl_key_req_map.erase(rreq->rkey());
// Remove the request from lsn map.
m_state_machine->unlink_lsn_to_req(rreq->lsn());
m_state_machine->unlink_lsn_to_req(rreq->lsn(), rreq);

auto cur_dsn = m_next_dsn.load(std::memory_order_relaxed);
while (cur_dsn <= rreq->dsn()) {
Expand Down Expand Up @@ -1191,9 +1191,22 @@ std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nu
entries.size());

auto reqs = sisl::VectorPool< repl_req_ptr_t >::alloc();
for (auto& entry : entries) {
auto last_commit_lsn = uint64_cast(get_last_commit_lsn());
for (unsigned long i = 0; i < entries.size(); i++) {
auto& entry = entries[i];
auto lsn = start_lsn + i;
auto term = entry->get_term();
if (entry->get_val_type() != nuraft::log_val_type::app_log) { continue; }
if (entry->get_buf_ptr()->size() == 0) { continue; }
// skipping localize for already committed log(dup), they anyway will be discard
// by nuraft before append_log.
if (lsn <= last_commit_lsn) {
JacksonYao287 marked this conversation as resolved.
Show resolved Hide resolved
RD_LOGT("Raft channel: term {}, lsn {}, skipping dup, last_commit_lsn {}", term, lsn,
last_commit_lsn);
continue;
}
// Those LSNs already in logstore but not yet committed, will be dedup here,
// applier_create_req will return same req as previous one
auto req = m_state_machine->localize_journal_entry_prepare(*entry);
if (req == nullptr) {
sisl::VectorPool< repl_req_ptr_t >::free(reqs);
Expand Down Expand Up @@ -1265,39 +1278,71 @@ cshared< ReplDevCPContext > RaftReplDev::get_cp_ctx(CP* cp) {
void RaftReplDev::cp_cleanup(CP*) {}

void RaftReplDev::gc_repl_reqs() {
std::vector< int64_t > expired_keys;
m_state_machine->iterate_repl_reqs([this, &expired_keys](auto key, auto rreq) {
auto cur_dsn = m_next_dsn.load();
if (cur_dsn != 0) cur_dsn = cur_dsn - 1;
// On follower, DSN below cur_dsn should very likely be commited.
// It is not guaranteed because DSN and LSN are generated separately,
// DSN in async_alloc_write before pushing data, LSN later when
// proposing to raft. Two simultaneous write requests on leader can have
// <LSN=100, DSN=102> and <LSN=101, DSN =101> during the window.
std::vector< repl_req_ptr_t > expired_rreqs;

auto req_map_size = m_repl_key_req_map.size();
RD_LOGI("m_repl_key_req_map size is {};", req_map_size);
for (auto [key, rreq] : m_repl_key_req_map) {
// FIXME: Skipping proposer for now, the DSN in proposer increased in proposing stage, not when commit().
// Need other mechanism.
if (rreq->is_proposer()) {
// don't clean up proposer's request
continue;
}
if (rreq->dsn() < cur_dsn && rreq->is_expired()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can revisit here if we have better solution to accurately identify the garbage in the future.
for now, let`s go ahead and not block sm long run.

// The DSN can be out of order, wait till rreq expired.
RD_LOGD("legacy req with commited DSN, rreq=[{}] , dsn = {}, next_dsn = {}, gap= {}, elapsed_time_sec {}",
rreq->to_string(), rreq->dsn(), cur_dsn, cur_dsn - rreq->dsn(),
get_elapsed_time_sec(rreq->created_time()));
expired_rreqs.push_back(rreq);
}
}
int sm_req_cnt = 0;
// FIXME: we ensured data written before appending log to log store, in which we add rreq to state_machine
// and during pre-commit/commit we retrieve rreq from state_machine. Removing requests outside of state
// machine is risky.
// Below logs are logging only, can be removed once we get more confidence.
m_state_machine->iterate_repl_reqs([this, cur_dsn, &sm_req_cnt](auto key, auto rreq) {
sm_req_cnt++;
if (rreq->is_proposer()) {
// don't clean up proposer's request
return;
}

if (rreq->is_expired()) {
expired_keys.push_back(key);
RD_LOGD("rreq=[{}] is expired, cleaning up; elapsed_time_sec{};", rreq->to_string(),
RD_LOGD("StateMachine: rreq=[{}] is expired, elapsed_time_sec{};", rreq->to_string(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not adding to expired_rreqs ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is very risky to remove a rreq from state-machine as the time you check it , it might in the middle of "commit" or "pre-commit" which will causing NPE/assert.

As we ensure logs are added to state-machine after data written, I dont find a case where we can have a request expiring in state-machine , so I am intentionally to remove this for loop (as said in the FIXME), but trying to verify through logging to get confidence.

get_elapsed_time_sec(rreq->created_time()));
xiaoxichen marked this conversation as resolved.
Show resolved Hide resolved

// do garbage collection
// 1. free the allocated blocks
if (rreq->has_state(repl_req_state_t::BLK_ALLOCATED)) {
auto blkid = rreq->local_blkid();
data_service().async_free_blk(blkid).thenValue([this, blkid](auto&& err) {
HS_LOG_ASSERT(!err, "freeing blkid={} upon error failed, potential to cause blk leak",
blkid.to_string());
RD_LOGD("blkid={} freed successfully", blkid.to_string());
});
}

// 2. remove from the m_repl_key_req_map
// handle_error during fetch data response might have already removed the rreq from the this map
if (m_repl_key_req_map.find(rreq->rkey()) != m_repl_key_req_map.end()) {
m_repl_key_req_map.erase(rreq->rkey());
}
}
});
RD_LOGI("state_machine req map size is {};", sm_req_cnt);

for (auto const& l : expired_keys) {
m_state_machine->unlink_lsn_to_req(l);
for (auto removing_rreq : expired_rreqs) {
// once log flushed, the commit progress controlled by raft
if (removing_rreq->has_state(repl_req_state_t::LOG_FLUSHED)) {
RD_LOGI("Skipping GC rreq [{}] because it is in state machine", removing_rreq->to_string());
continue;
}
// do garbage collection
// 1. free the allocated blocks
RD_LOGI("Removing rreq [{}]", removing_rreq->to_string());
if (removing_rreq->has_state(repl_req_state_t::BLK_ALLOCATED)) {
auto blkid = removing_rreq->local_blkid();
data_service().async_free_blk(blkid).thenValue([this, blkid](auto&& err) {
HS_LOG_ASSERT(!err, "freeing blkid={} upon error failed, potential to cause blk leak",
blkid.to_string());
RD_LOGD("GC rreq: Releasing blkid={} freed successfully", blkid.to_string());
});
}
// 2. remove from the m_repl_key_req_map
if (m_repl_key_req_map.find(removing_rreq->rkey()) != m_repl_key_req_map.end()) {
m_repl_key_req_map.erase(removing_rreq->rkey());
}
}
}

Expand Down
11 changes: 6 additions & 5 deletions src/lib/replication/repl_dev/raft_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,11 +221,12 @@ uint64_t RaftStateMachine::last_commit_index() {

void RaftStateMachine::become_ready() { m_rd.become_ready(); }

void RaftStateMachine::unlink_lsn_to_req(int64_t lsn) {
auto const it = m_lsn_req_map.find(lsn);
if (it != m_lsn_req_map.cend()) {
RD_LOG(DEBUG, "Raft channel: erase lsn {}, rreq {}", lsn, it->second->to_string());
m_lsn_req_map.erase(lsn);
void RaftStateMachine::unlink_lsn_to_req(int64_t lsn, repl_req_ptr_t rreq) {
// it is possible a LSN mapped to different rreq in history
// due to log overwritten. Verify the rreq before removing
auto deleted = m_lsn_req_map.erase_if_equal(lsn, rreq);
if (deleted) {
RD_LOG(DEBUG, "Raft channel: erase lsn {}, rreq {}", lsn, rreq->to_string());
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/lib/replication/repl_dev/raft_state_machine.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class RaftStateMachine : public nuraft::state_machine {
repl_req_ptr_t localize_journal_entry_prepare(nuraft::log_entry& lentry);
repl_req_ptr_t localize_journal_entry_finish(nuraft::log_entry& lentry);
void link_lsn_to_req(repl_req_ptr_t rreq, int64_t lsn);
void unlink_lsn_to_req(int64_t lsn);
void unlink_lsn_to_req(int64_t lsn, repl_req_ptr_t rreq);
repl_req_ptr_t lsn_to_req(int64_t lsn);
nuraft_mesg::repl_service_ctx* group_msg_service();

Expand Down
Loading