Skip to content

Commit

Permalink
Issue #279 Garbage Collection of Replication request on originator cr…
Browse files Browse the repository at this point in the history
…ash/reboot (#432)

* Issue 279: GarbageCollect Repl Reqs
  • Loading branch information
yamingk authored Jun 12, 2024
1 parent 6ba47be commit 37fa2bc
Show file tree
Hide file tree
Showing 11 changed files with 123 additions and 7 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.4.14"
version = "6.4.15"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
1 change: 1 addition & 0 deletions src/include/homestore/replication/repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost::
std::string to_string() const;
std::string to_compact_string() const;
Clock::time_point created_time() const { return m_start_time; }
bool is_expired() const;

/////////////////////// All Modifiers methods //////////////////

Expand Down
3 changes: 3 additions & 0 deletions src/lib/common/homestore_config.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,9 @@ table Consensus {
// Timeout for data to be received after raft entry after which raft entry is rejected.
data_receive_timeout_ms: uint64 = 10000;

// ReplDev Reqs timeout in seconds.
repl_req_timeout_sec: uint32 = 300;

// Frequency to flush durable commit LSN in millis
flush_durable_commit_interval_ms: uint64 = 500;
}
Expand Down
7 changes: 6 additions & 1 deletion src/lib/replication/repl_dev/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <sisl/grpc/rpc_call.hpp>
#include <homestore/blkdata_service.hpp>
#include <homestore/replication/repl_dev.h>
#include <common/homestore_config.hpp>
#include "replication/repl_dev/common.h"
#include <libnuraft/nuraft.hxx>

Expand Down Expand Up @@ -196,4 +197,8 @@ std::string repl_req_ctx::to_compact_string() const {
enum_name(m_op_code), m_local_blkid.to_string(), req_state_name(uint32_cast(state())));
}

} // namespace homestore
bool repl_req_ctx::is_expired() const {
return get_elapsed_time_sec(m_start_time) > HS_DYNAMIC_CONFIG(consensus.repl_req_timeout_sec);
}

} // namespace homestore
38 changes: 37 additions & 1 deletion src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t > rreq
void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) {
if (rreqs.size() == 0) { return; }

std::vector< ::flatbuffers::Offset< RequestEntry > > entries;
std::vector<::flatbuffers::Offset< RequestEntry > > entries;
entries.reserve(rreqs.size());

shared< flatbuffers::FlatBufferBuilder > builder = std::make_shared< flatbuffers::FlatBufferBuilder >();
Expand Down Expand Up @@ -997,6 +997,42 @@ void RaftReplDev::cp_flush(CP*) {

void RaftReplDev::cp_cleanup(CP*) {}

void RaftReplDev::gc_repl_reqs() {
std::vector< int64_t > expired_keys;
m_state_machine->iterate_repl_reqs([this, &expired_keys](auto key, auto rreq) {
if (rreq->is_proposer()) {
// don't clean up proposer's request
return;
}

if (rreq->is_expired()) {
expired_keys.push_back(key);
RD_LOGD("rreq=[{}] is expired, cleaning up", rreq->to_compact_string());

// do garbage collection
// 1. free the allocated blocks
if (rreq->has_state(repl_req_state_t::BLK_ALLOCATED)) {
auto blkid = rreq->local_blkid();
data_service().async_free_blk(blkid).thenValue([this, blkid](auto&& err) {
HS_LOG_ASSERT(!err, "freeing blkid={} upon error failed, potential to cause blk leak",
blkid.to_string());
RD_LOGD("blkid={} freed successfully", blkid.to_string());
});
}

// 2. remove from the m_repl_key_req_map
// handle_error during fetch data response might have already removed the rreq from the this map
if (m_repl_key_req_map.find(rreq->rkey()) != m_repl_key_req_map.end()) {
m_repl_key_req_map.erase(rreq->rkey());
}
}
});

for (auto const& l : expired_keys) {
m_state_machine->unlink_lsn_to_req(l);
}
}

void RaftReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx) {
// apply the log entry if the lsn is between checkpoint lsn and durable commit lsn
if (lsn < m_rd_sb->checkpoint_lsn || lsn > m_rd_sb->durable_commit_lsn) { return; }
Expand Down
2 changes: 2 additions & 0 deletions src/lib/replication/repl_dev/raft_repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ class RaftReplDev : public ReplDev,

void wait_for_logstore_ready() { m_data_journal->wait_for_log_store_ready(); }

void gc_repl_reqs();

/**
* Flush the durable commit LSN to the superblock
*/
Expand Down
11 changes: 11 additions & 0 deletions src/lib/replication/repl_dev/raft_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,21 @@ raft_buf_ptr_t RaftStateMachine::commit_ext(nuraft::state_machine::ext_op_params
return m_success_ptr;
}

void RaftStateMachine::iterate_repl_reqs(std::function< void(int64_t, repl_req_ptr_t rreq) > const& cb) {
for (auto [key, rreq] : m_lsn_req_map) {
cb(key, rreq);
}
}

uint64_t RaftStateMachine::last_commit_index() { return uint64_cast(m_rd.get_last_commit_lsn()); }

void RaftStateMachine::become_ready() { m_rd.become_ready(); }

void RaftStateMachine::unlink_lsn_to_req(int64_t lsn) {
auto const it = m_lsn_req_map.find(lsn);
if (it != m_lsn_req_map.cend()) { m_lsn_req_map.erase(lsn); }
}

void RaftStateMachine::link_lsn_to_req(repl_req_ptr_t rreq, int64_t lsn) {
rreq->set_lsn(lsn);
rreq->add_state(repl_req_state_t::LOG_RECEIVED);
Expand Down
5 changes: 4 additions & 1 deletion src/lib/replication/repl_dev/raft_state_machine.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ using AsyncNotifier = folly::Promise< folly::Unit >;
class RaftReplDev;
class RaftStateMachine : public nuraft::state_machine {
private:
folly::ConcurrentHashMap< int64_t, repl_req_ptr_t > m_lsn_req_map;
folly::ConcurrentHashMap< int64_t /*lsn*/, repl_req_ptr_t > m_lsn_req_map;
RaftReplDev& m_rd;
nuraft::ptr< nuraft::buffer > m_success_ptr; // Preallocate the success return to raft
// iomgr::timer_handle_t m_wait_blkid_write_timer_hdl{iomgr::null_timer_handle};
Expand Down Expand Up @@ -121,9 +121,12 @@ class RaftStateMachine : public nuraft::state_machine {
repl_req_ptr_t localize_journal_entry_prepare(nuraft::log_entry& lentry);
repl_req_ptr_t localize_journal_entry_finish(nuraft::log_entry& lentry);
void link_lsn_to_req(repl_req_ptr_t rreq, int64_t lsn);
void unlink_lsn_to_req(int64_t lsn);
repl_req_ptr_t lsn_to_req(int64_t lsn);
nuraft_mesg::repl_service_ctx* group_msg_service();

void iterate_repl_reqs(std::function< void(int64_t, repl_req_ptr_t rreq) > const& cb);

std::string rdev_name() const;

private:
Expand Down
15 changes: 12 additions & 3 deletions src/lib/replication/service/raft_repl_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,10 @@ void RaftReplService::start_reaper_thread() {
// Schedule the rdev garbage collector timer
m_rdev_gc_timer_hdl = iomanager.schedule_thread_timer(
HS_DYNAMIC_CONFIG(generic.repl_dev_cleanup_interval_sec) * 1000 * 1000 * 1000, true /* recurring */,
nullptr, [this](void*) { gc_repl_devs(); });
nullptr, [this](void*) {
gc_repl_reqs();
gc_repl_devs();
});

// Check for queued fetches at the minimum every second
uint64_t interval_ns =
Expand All @@ -352,9 +355,7 @@ void RaftReplService::start_reaper_thread() {
p.setValue();
} else {
// Cancel all recurring timers started
#if 0
iomanager.cancel_timer(m_rdev_gc_timer_hdl, true /* wait */);
#endif
iomanager.cancel_timer(m_rdev_fetch_timer_hdl, true /* wait */);
iomanager.cancel_timer(m_flush_durable_commit_timer_hdl, true /* wait */);
}
Expand Down Expand Up @@ -388,6 +389,14 @@ void RaftReplService::fetch_pending_data() {
}
}

void RaftReplService::gc_repl_reqs() {
std::shared_lock lg(m_rd_map_mtx);
for (auto it = m_rd_map.begin(); it != m_rd_map.end(); ++it) {
auto rdev = std::dynamic_pointer_cast< RaftReplDev >(it->second);
rdev->gc_repl_reqs();
}
}

void RaftReplService::gc_repl_devs() {
std::unique_lock lg(m_rd_map_mtx);
for (auto it = m_rd_map.begin(); it != m_rd_map.end();) {
Expand Down
2 changes: 2 additions & 0 deletions src/lib/replication/service/raft_repl_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ class RaftReplService : public GenericReplService,
void stop_reaper_thread();
void fetch_pending_data();
void gc_repl_devs();
void gc_repl_reqs();
void flush_durable_commit_lsn();

};

class RaftReplServiceCPHandler : public CPCallbacks {
Expand Down
44 changes: 44 additions & 0 deletions src/tests/test_raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,50 @@ TEST_F(RaftReplDevTest, RemoveReplDev) {
g_helper->sync_for_cleanup_start();
}

#ifdef _PRERELEASE
// Garbage collect the replication requests
// 0. Simulate data push is dropped so that fetch data can be triggered (if both data and raft channel received, we
// won't have timeout rreqs).
TEST_F(RaftReplDevTest, GCReplReqs) {
LOGINFO("Homestore replica={} setup completed", g_helper->replica_num());
g_helper->sync_for_test_start();

uint32_t prev_timeout_sec{0};
LOGINFO("Set the repl_req_timout_sec to be fairly small to force GC to kick in");
HS_SETTINGS_FACTORY().modifiable_settings([&prev_timeout_sec](auto& s) {
prev_timeout_sec = s.consensus.repl_req_timeout_sec;
s.consensus.repl_req_timeout_sec = 5;
});
HS_SETTINGS_FACTORY().save();

if (g_helper->replica_num() != 0) {
LOGINFO("Set flip to fake fetch data request on data channel");
set_basic_flip("drop_push_data_request");
}

this->write_on_leader(100 /* num_entries */, true /* wait_for_commit */);

// Step 2: Restart replica-0 (Leader)
this->restart_replica(0, 10);

LOGINFO("After original leader is shutdown, insert more entries into the new leader");
this->write_on_leader(100, true /* wait for commit on all replicas */);

g_helper->sync_for_verify_start();

LOGINFO("Validate all data written so far by reading them");
this->validate_data();

// step-5: Set the settings back and save. This is needed (if we ever give a --config in the test)
LOGINFO("Set the repl_req_timeout back to previous value={}", prev_timeout_sec);
HS_SETTINGS_FACTORY().modifiable_settings(
[prev_timeout_sec](auto& s) { s.consensus.repl_req_timeout_sec = prev_timeout_sec; });
HS_SETTINGS_FACTORY().save();

g_helper->sync_for_cleanup_start();
}
#endif

int main(int argc, char* argv[]) {
int parsed_argc = argc;
char** orig_argv = argv;
Expand Down

0 comments on commit 37fa2bc

Please sign in to comment.