Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement GC_REPL_REQ Based on DSN to Prevent Resource Leaks #576

Merged
merged 11 commits into from
Nov 6, 2024
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.5.5"
version = "6.5.6"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
2 changes: 1 addition & 1 deletion src/include/homestore/replication/repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost::
friend class SoloReplDev;

public:
repl_req_ctx() {}
repl_req_ctx() { m_start_time = Clock::now(); }
virtual ~repl_req_ctx();
void init(repl_key rkey, journal_type_t op_code, bool is_proposer, sisl::blob const& user_header,
sisl::blob const& key, uint32_t data_size);
Expand Down
99 changes: 72 additions & 27 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -895,7 +895,7 @@ void RaftReplDev::handle_commit(repl_req_ptr_t rreq, bool recovery) {
// Remove the request from repl_key map.
m_repl_key_req_map.erase(rreq->rkey());
// Remove the request from lsn map.
m_state_machine->unlink_lsn_to_req(rreq->lsn());
m_state_machine->unlink_lsn_to_req(rreq->lsn(), rreq);

auto cur_dsn = m_next_dsn.load(std::memory_order_relaxed);
while (cur_dsn <= rreq->dsn()) {
Expand Down Expand Up @@ -1191,9 +1191,22 @@ std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nu
entries.size());

auto reqs = sisl::VectorPool< repl_req_ptr_t >::alloc();
for (auto& entry : entries) {
auto last_commit_lsn = uint64_cast(get_last_commit_lsn());
for (unsigned long i = 0; i < entries.size(); i++) {
auto& entry = entries[i];
auto lsn = start_lsn + i;
auto term = entry->get_term();
if (entry->get_val_type() != nuraft::log_val_type::app_log) { continue; }
if (entry->get_buf_ptr()->size() == 0) { continue; }
// skipping localize for already committed log(dup), they anyway will be discard
// by nuraft before append_log.
if (lsn <= last_commit_lsn) {
JacksonYao287 marked this conversation as resolved.
Show resolved Hide resolved
RD_LOGT("Raft channel: term {}, lsn {}, skipping dup, last_commit_lsn {}", term, lsn,
last_commit_lsn);
continue;
}
// Those LSNs already in logstore but not yet committed, will be dedup here,
// applier_create_req will return same req as previous one
auto req = m_state_machine->localize_journal_entry_prepare(*entry);
if (req == nullptr) {
sisl::VectorPool< repl_req_ptr_t >::free(reqs);
Expand Down Expand Up @@ -1265,39 +1278,71 @@ cshared< ReplDevCPContext > RaftReplDev::get_cp_ctx(CP* cp) {
void RaftReplDev::cp_cleanup(CP*) {}

void RaftReplDev::gc_repl_reqs() {
std::vector< int64_t > expired_keys;
m_state_machine->iterate_repl_reqs([this, &expired_keys](auto key, auto rreq) {
auto cur_dsn = m_next_dsn.load();
if (cur_dsn != 0) cur_dsn = cur_dsn - 1;
// On follower, any DSN below cur_dsn , should already be commited.
// That implies the rreq should already be removed from m_repl_key_req_map
std::vector< repl_req_ptr_t > expired_rreqs;

auto req_map_size = m_repl_key_req_map.size();
RD_LOGI("m_repl_key_req_map size is {};", req_map_size);
for (auto [key, rreq] : m_repl_key_req_map) {
// FIXME: Skipping proposer for now, the DSN in proposer increased in proposing stage, not when commit().
// Need other mechanism.
if (rreq->is_proposer()) {
// don't clean up proposer's request
continue;
}
if (rreq->dsn() < cur_dsn) {
RD_LOGD("legacy req with commited DSN, rreq=[{}] , dsn = {}, next_dsn = {}, gap= {}, elapsed_time_sec {}",
rreq->to_string(), rreq->dsn(), cur_dsn, cur_dsn - rreq->dsn(),
get_elapsed_time_sec(rreq->created_time()));
expired_rreqs.push_back(rreq);
}
}
int sm_req_cnt = 0;
// FIXME: we ensured data written before appending log to log store, in which we add rreq to state_machine
// and during pre-commit/commit we retrieve rreq from state_machine. Removing requests outside of state
// machine is risky.
// Below logs are logging only, can be removed once we get more confidence.
m_state_machine->iterate_repl_reqs([this, cur_dsn, &sm_req_cnt](auto key, auto rreq) {
sm_req_cnt++;
if (rreq->is_proposer()) {
// don't clean up proposer's request
return;
}

if (rreq->dsn() < cur_dsn) {
RD_LOGD("StateMachine: legacy req, rreq=[{}] , dsn = {}, next_dsn = {}, gap= {}", rreq->to_string(),
rreq->dsn(), cur_dsn, cur_dsn - rreq->dsn());
}
if (rreq->is_expired()) {
expired_keys.push_back(key);
RD_LOGD("rreq=[{}] is expired, cleaning up; elapsed_time_sec{};", rreq->to_string(),
RD_LOGD("StateMachine: rreq=[{}] is expired, elapsed_time_sec{};", rreq->to_string(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not adding to expired_rreqs ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is very risky to remove a rreq from state-machine as the time you check it , it might in the middle of "commit" or "pre-commit" which will causing NPE/assert.

As we ensure logs are added to state-machine after data written, I dont find a case where we can have a request expiring in state-machine , so I am intentionally to remove this for loop (as said in the FIXME), but trying to verify through logging to get confidence.

get_elapsed_time_sec(rreq->created_time()));
xiaoxichen marked this conversation as resolved.
Show resolved Hide resolved

// do garbage collection
// 1. free the allocated blocks
if (rreq->has_state(repl_req_state_t::BLK_ALLOCATED)) {
auto blkid = rreq->local_blkid();
data_service().async_free_blk(blkid).thenValue([this, blkid](auto&& err) {
HS_LOG_ASSERT(!err, "freeing blkid={} upon error failed, potential to cause blk leak",
blkid.to_string());
RD_LOGD("blkid={} freed successfully", blkid.to_string());
});
}

// 2. remove from the m_repl_key_req_map
// handle_error during fetch data response might have already removed the rreq from the this map
if (m_repl_key_req_map.find(rreq->rkey()) != m_repl_key_req_map.end()) {
m_repl_key_req_map.erase(rreq->rkey());
}
}
});

for (auto const& l : expired_keys) {
m_state_machine->unlink_lsn_to_req(l);
RD_LOGI("state_machine req map size is {};", sm_req_cnt);

for (auto removing_rreq : expired_rreqs) {
// do garbage collection
// 1. free the allocated blocks
RD_LOGI("Removing rreq [{}]", removing_rreq->to_string());
if (removing_rreq->has_state(repl_req_state_t::BLK_ALLOCATED)) {
auto blkid = removing_rreq->local_blkid();
data_service().async_free_blk(blkid).thenValue([this, blkid](auto&& err) {
HS_LOG_ASSERT(!err, "freeing blkid={} upon error failed, potential to cause blk leak",
blkid.to_string());
RD_LOGD("GC rreq: Releasing blkid={} freed successfully", blkid.to_string());
});
}
// 2. remove from the m_repl_key_req_map
if (m_repl_key_req_map.find(removing_rreq->rkey()) != m_repl_key_req_map.end()) {
m_repl_key_req_map.erase(removing_rreq->rkey());
}
// 3. remove from state-machine
if (removing_rreq->has_state(repl_req_state_t::LOG_FLUSHED)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While we are iterating, we delete or unlink from the same map. Is it safe ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I didnt get this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iterate_repl_reqs is iterating through m_lsn_req_map and unlink is erasing it from m_lsn_req_map. Its not iterator so it looks safe.

RD_LOGW("Removing rreq [{}] from state machine, it is risky")
m_state_machine->unlink_lsn_to_req(removing_rreq->lsn(), removing_rreq);
}
}
}

Expand Down
13 changes: 10 additions & 3 deletions src/lib/replication/repl_dev/raft_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,11 +221,18 @@ uint64_t RaftStateMachine::last_commit_index() {

void RaftStateMachine::become_ready() { m_rd.become_ready(); }

void RaftStateMachine::unlink_lsn_to_req(int64_t lsn) {
void RaftStateMachine::unlink_lsn_to_req(int64_t lsn, repl_req_ptr_t rreq) {
auto const it = m_lsn_req_map.find(lsn);
// it is possible a LSN mapped to different rreq in history
// due to log overwritten. Verify the rreq before removing
if (it != m_lsn_req_map.cend()) {
RD_LOG(DEBUG, "Raft channel: erase lsn {}, rreq {}", lsn, it->second->to_string());
m_lsn_req_map.erase(lsn);
if (it->second == rreq) {
RD_LOG(DEBUG, "Raft channel: erase lsn {}, rreq {}", lsn, it->second->to_string());
m_lsn_req_map.erase(lsn);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there might a very small case that some change happens between line 229 and line 231.
my suggestion is using erase_if_equal instead
https://github.com/facebook/folly/blob/30a4e783a7618f17a5b24048625872e363068887/folly/concurrency/ConcurrentHashMap.h#L497

} else {
RD_LOGC("Erasing lsn {} pointing to rreq{} differnt with providing rreq {}", lsn, it->second->to_string(),
rreq->to_string());
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/lib/replication/repl_dev/raft_state_machine.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class RaftStateMachine : public nuraft::state_machine {
repl_req_ptr_t localize_journal_entry_prepare(nuraft::log_entry& lentry);
repl_req_ptr_t localize_journal_entry_finish(nuraft::log_entry& lentry);
void link_lsn_to_req(repl_req_ptr_t rreq, int64_t lsn);
void unlink_lsn_to_req(int64_t lsn);
void unlink_lsn_to_req(int64_t lsn, repl_req_ptr_t rreq);
repl_req_ptr_t lsn_to_req(int64_t lsn);
nuraft_mesg::repl_service_ctx* group_msg_service();

Expand Down
Loading