Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement GC_REPL_REQ Based on DSN to Prevent Resource Leaks #576

Merged
merged 11 commits into from
Nov 6, 2024
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.5.4"
version = "6.5.6"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
2 changes: 1 addition & 1 deletion src/include/homestore/replication/repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost::
friend class SoloReplDev;

public:
repl_req_ctx() {}
repl_req_ctx() { m_start_time = Clock::now(); }
virtual ~repl_req_ctx();
void init(repl_key rkey, journal_type_t op_code, bool is_proposer, sisl::blob const& user_header,
sisl::blob const& key, uint32_t data_size);
Expand Down
103 changes: 78 additions & 25 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1191,9 +1191,22 @@ std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nu
entries.size());

auto reqs = sisl::VectorPool< repl_req_ptr_t >::alloc();
for (auto& entry : entries) {
auto last_commit_lsn = uint64_cast(get_last_commit_lsn());
for (unsigned long i=0; i < entries.size(); i++) {
auto& entry = entries[i];
auto lsn = start_lsn + i;
auto term = entry->get_term();
if (entry->get_val_type() != nuraft::log_val_type::app_log) { continue; }
if (entry->get_buf_ptr()->size() == 0) { continue; }
// skipping localize for already committed log(dup), they anyway will be discard
// by nuraft before append_log.
if (lsn <= last_commit_lsn) {
JacksonYao287 marked this conversation as resolved.
Show resolved Hide resolved
RD_LOGT("Raft channel: term {}, lsn {}, skipping dup, last_commit_lsn {}",
term, lsn, last_commit_lsn);
continue;
}
// Those LSNs already in logstore but not yet committed, will be dedup here,
// applier_create_req will return same req as previous one
auto req = m_state_machine->localize_journal_entry_prepare(*entry);
if (req == nullptr) {
sisl::VectorPool< repl_req_ptr_t >::free(reqs);
Expand Down Expand Up @@ -1265,39 +1278,79 @@ cshared< ReplDevCPContext > RaftReplDev::get_cp_ctx(CP* cp) {
void RaftReplDev::cp_cleanup(CP*) {}

void RaftReplDev::gc_repl_reqs() {
std::vector< int64_t > expired_keys;
m_state_machine->iterate_repl_reqs([this, &expired_keys](auto key, auto rreq) {
auto cur_dsn = m_next_dsn.load();
if (cur_dsn != 0) cur_dsn = cur_dsn - 1;
// On follower, any DSN below cur_dsn , should already be commited.
// That implies the rreq should already be removed from m_repl_key_req_map
std::vector< repl_req_ptr_t > expired_rreqs;

auto req_map_size = m_repl_key_req_map.size();
RD_LOGW("m_repl_key_req_map size is {};", req_map_size);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove the log.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved it to LOGI, this logging is helpful as we hit mem-leak twice around same place that in certain cases we dont remove rreq from m_repl_key_req_map. The first one is what you found and fixed that we forgot to remove in on_commit, this patch is the second time...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better keep a metrics of this repl related map's of count and total memory usage.

for (auto [key, rreq] : m_repl_key_req_map) {
// FIXME: Skipping proposer for now, the DSN in proposer increased in proposing stage, not when commit().
// Need other mechanism.
if (rreq->is_proposer()) {
RD_LOGD("Skipping rreq=[{}] due to is_proposer, elapsed_time_sec{};", rreq->to_string(), get_elapsed_time_sec(rreq->created_time()));
// don't clean up proposer's request
return;
continue;
}

if (rreq->dsn() < cur_dsn ) {
RD_LOGD("legacy req with commited DSN, rreq=[{}] , dsn = {}, next_dsn = {}, gap= {}", rreq->to_string(), rreq->dsn(),
cur_dsn, cur_dsn - rreq->dsn());
// FIXME: Wait till the rreq expired is obviously safer, though as commited request will
// be removed from map in on_commit(), we probably don't need wait till expired.
if (rreq->is_expired()) {
xiaoxichen marked this conversation as resolved.
Show resolved Hide resolved
RD_LOGD("Expired rreq =[{}], elapsed_time_sec {} ", rreq->to_string(), get_elapsed_time_sec(rreq->created_time()));
expired_rreqs.push_back(rreq);
}
}
}
int sm_req_cnt = 0;
// FIXME: we ensured data written before appending log to log store, in which we add rreq to state_machine
// and during pre-commit/commit we retrieve rreq from state_machine. Removing requests outside of state
// machine is risky.
// Below logs are logging only, can be removed once we get more confidence.
m_state_machine->iterate_repl_reqs([this, cur_dsn, &sm_req_cnt](auto key, auto rreq) {
sm_req_cnt++;
if (rreq->is_proposer()) {
RD_LOGD("Skipping rreq=[{}] due to is_proposer, elapsed_time_sec{};", rreq->to_string(),
get_elapsed_time_sec(rreq->created_time()));
// don't clean up proposer's request
return;
}
if (rreq->dsn() < cur_dsn) {
RD_LOGD("legacy req, rreq=[{}] , dsn = {}, next_dsn = {}, gap= {}", rreq->to_string(), rreq->dsn(),
cur_dsn, cur_dsn - rreq->dsn());
}
if (rreq->is_expired()) {
expired_keys.push_back(key);
RD_LOGD("rreq=[{}] is expired, cleaning up; elapsed_time_sec{};", rreq->to_string(),
get_elapsed_time_sec(rreq->created_time()));
xiaoxichen marked this conversation as resolved.
Show resolved Hide resolved

// do garbage collection
// 1. free the allocated blocks
if (rreq->has_state(repl_req_state_t::BLK_ALLOCATED)) {
auto blkid = rreq->local_blkid();
data_service().async_free_blk(blkid).thenValue([this, blkid](auto&& err) {
HS_LOG_ASSERT(!err, "freeing blkid={} upon error failed, potential to cause blk leak",
blkid.to_string());
RD_LOGD("blkid={} freed successfully", blkid.to_string());
});
}

// 2. remove from the m_repl_key_req_map
// handle_error during fetch data response might have already removed the rreq from the this map
if (m_repl_key_req_map.find(rreq->rkey()) != m_repl_key_req_map.end()) {
m_repl_key_req_map.erase(rreq->rkey());
}
}
});

for (auto const& l : expired_keys) {
m_state_machine->unlink_lsn_to_req(l);
RD_LOGW("state_machine req map size is {};", sm_req_cnt);

for (auto removing_rreq : expired_rreqs) {
// do garbage collection
// 1. free the allocated blocks
RD_LOGI("Removing rreq [{}]", removing_rreq->to_string());
if (removing_rreq->has_state(repl_req_state_t::BLK_ALLOCATED)) {
auto blkid = removing_rreq->local_blkid();
data_service().async_free_blk(blkid).thenValue([this, blkid](auto&& err) {
HS_LOG_ASSERT(!err, "freeing blkid={} upon error failed, potential to cause blk leak",
blkid.to_string());
RD_LOGD("GC rreq: Releasing blkid={} freed successfully", blkid.to_string());
});
}
// 2. remove from the m_repl_key_req_map
if (m_repl_key_req_map.find(removing_rreq->rkey()) != m_repl_key_req_map.end()) {
m_repl_key_req_map.erase(removing_rreq->rkey());
}
// 3. remove from state-machine
if (removing_rreq->has_state(repl_req_state_t::LOG_FLUSHED)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While we are iterating, we delete or unlink from the same map. Is it safe ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I didnt get this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iterate_repl_reqs is iterating through m_lsn_req_map and unlink is erasing it from m_lsn_req_map. Its not iterator so it looks safe.

RD_LOGW("Removing rreq [{}] from state machine, it is risky")
m_state_machine->unlink_lsn_to_req(removing_rreq->lsn());
xiaoxichen marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

Expand Down
Loading