diff --git a/conanfile.py b/conanfile.py index 30286bda7..0afc2ba77 100644 --- a/conanfile.py +++ b/conanfile.py @@ -5,7 +5,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "6.4.14" + version = "6.4.15" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/include/homestore/replication/repl_dev.h b/src/include/homestore/replication/repl_dev.h index ee06c5908..4dd57ce2d 100644 --- a/src/include/homestore/replication/repl_dev.h +++ b/src/include/homestore/replication/repl_dev.h @@ -103,6 +103,7 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost:: std::string to_string() const; std::string to_compact_string() const; Clock::time_point created_time() const { return m_start_time; } + bool is_expired() const; /////////////////////// All Modifiers methods ////////////////// diff --git a/src/lib/common/homestore_config.fbs b/src/lib/common/homestore_config.fbs index 7e7121143..c283bc9ba 100644 --- a/src/lib/common/homestore_config.fbs +++ b/src/lib/common/homestore_config.fbs @@ -251,6 +251,9 @@ table Consensus { // Timeout for data to be received after raft entry after which raft entry is rejected. data_receive_timeout_ms: uint64 = 10000; + // ReplDev Reqs timeout in seconds. + repl_req_timeout_sec: uint32 = 300; + // Frequency to flush durable commit LSN in millis flush_durable_commit_interval_ms: uint64 = 500; } diff --git a/src/lib/replication/repl_dev/common.cpp b/src/lib/replication/repl_dev/common.cpp index c74a45fb6..612941834 100644 --- a/src/lib/replication/repl_dev/common.cpp +++ b/src/lib/replication/repl_dev/common.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include "replication/repl_dev/common.h" #include @@ -196,4 +197,8 @@ std::string repl_req_ctx::to_compact_string() const { enum_name(m_op_code), m_local_blkid.to_string(), req_state_name(uint32_cast(state()))); } -} // namespace homestore \ No newline at end of file +bool repl_req_ctx::is_expired() const { + return get_elapsed_time_sec(m_start_time) > HS_DYNAMIC_CONFIG(consensus.repl_req_timeout_sec); +} + +} // namespace homestore diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index 3afb6a7c5..b3cf0ae83 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -473,7 +473,7 @@ void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t > rreq void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) { if (rreqs.size() == 0) { return; } - std::vector< ::flatbuffers::Offset< RequestEntry > > entries; + std::vector<::flatbuffers::Offset< RequestEntry > > entries; entries.reserve(rreqs.size()); shared< flatbuffers::FlatBufferBuilder > builder = std::make_shared< flatbuffers::FlatBufferBuilder >(); @@ -997,6 +997,42 @@ void RaftReplDev::cp_flush(CP*) { void RaftReplDev::cp_cleanup(CP*) {} +void RaftReplDev::gc_repl_reqs() { + std::vector< int64_t > expired_keys; + m_state_machine->iterate_repl_reqs([this, &expired_keys](auto key, auto rreq) { + if (rreq->is_proposer()) { + // don't clean up proposer's request + return; + } + + if (rreq->is_expired()) { + expired_keys.push_back(key); + RD_LOGD("rreq=[{}] is expired, cleaning up", rreq->to_compact_string()); + + // do garbage collection + // 1. free the allocated blocks + if (rreq->has_state(repl_req_state_t::BLK_ALLOCATED)) { + auto blkid = rreq->local_blkid(); + data_service().async_free_blk(blkid).thenValue([this, blkid](auto&& err) { + HS_LOG_ASSERT(!err, "freeing blkid={} upon error failed, potential to cause blk leak", + blkid.to_string()); + RD_LOGD("blkid={} freed successfully", blkid.to_string()); + }); + } + + // 2. remove from the m_repl_key_req_map + // handle_error during fetch data response might have already removed the rreq from the this map + if (m_repl_key_req_map.find(rreq->rkey()) != m_repl_key_req_map.end()) { + m_repl_key_req_map.erase(rreq->rkey()); + } + } + }); + + for (auto const& l : expired_keys) { + m_state_machine->unlink_lsn_to_req(l); + } +} + void RaftReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx) { // apply the log entry if the lsn is between checkpoint lsn and durable commit lsn if (lsn < m_rd_sb->checkpoint_lsn || lsn > m_rd_sb->durable_commit_lsn) { return; } diff --git a/src/lib/replication/repl_dev/raft_repl_dev.h b/src/lib/replication/repl_dev/raft_repl_dev.h index cd1eae043..1395dfeb4 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.h +++ b/src/lib/replication/repl_dev/raft_repl_dev.h @@ -176,6 +176,8 @@ class RaftReplDev : public ReplDev, void wait_for_logstore_ready() { m_data_journal->wait_for_log_store_ready(); } + void gc_repl_reqs(); + /** * Flush the durable commit LSN to the superblock */ diff --git a/src/lib/replication/repl_dev/raft_state_machine.cpp b/src/lib/replication/repl_dev/raft_state_machine.cpp index 239303e5e..4f0d3f80a 100644 --- a/src/lib/replication/repl_dev/raft_state_machine.cpp +++ b/src/lib/replication/repl_dev/raft_state_machine.cpp @@ -198,10 +198,21 @@ raft_buf_ptr_t RaftStateMachine::commit_ext(nuraft::state_machine::ext_op_params return m_success_ptr; } +void RaftStateMachine::iterate_repl_reqs(std::function< void(int64_t, repl_req_ptr_t rreq) > const& cb) { + for (auto [key, rreq] : m_lsn_req_map) { + cb(key, rreq); + } +} + uint64_t RaftStateMachine::last_commit_index() { return uint64_cast(m_rd.get_last_commit_lsn()); } void RaftStateMachine::become_ready() { m_rd.become_ready(); } +void RaftStateMachine::unlink_lsn_to_req(int64_t lsn) { + auto const it = m_lsn_req_map.find(lsn); + if (it != m_lsn_req_map.cend()) { m_lsn_req_map.erase(lsn); } +} + void RaftStateMachine::link_lsn_to_req(repl_req_ptr_t rreq, int64_t lsn) { rreq->set_lsn(lsn); rreq->add_state(repl_req_state_t::LOG_RECEIVED); diff --git a/src/lib/replication/repl_dev/raft_state_machine.h b/src/lib/replication/repl_dev/raft_state_machine.h index c99057443..db870555a 100644 --- a/src/lib/replication/repl_dev/raft_state_machine.h +++ b/src/lib/replication/repl_dev/raft_state_machine.h @@ -92,7 +92,7 @@ using AsyncNotifier = folly::Promise< folly::Unit >; class RaftReplDev; class RaftStateMachine : public nuraft::state_machine { private: - folly::ConcurrentHashMap< int64_t, repl_req_ptr_t > m_lsn_req_map; + folly::ConcurrentHashMap< int64_t /*lsn*/, repl_req_ptr_t > m_lsn_req_map; RaftReplDev& m_rd; nuraft::ptr< nuraft::buffer > m_success_ptr; // Preallocate the success return to raft // iomgr::timer_handle_t m_wait_blkid_write_timer_hdl{iomgr::null_timer_handle}; @@ -121,9 +121,12 @@ class RaftStateMachine : public nuraft::state_machine { repl_req_ptr_t localize_journal_entry_prepare(nuraft::log_entry& lentry); repl_req_ptr_t localize_journal_entry_finish(nuraft::log_entry& lentry); void link_lsn_to_req(repl_req_ptr_t rreq, int64_t lsn); + void unlink_lsn_to_req(int64_t lsn); repl_req_ptr_t lsn_to_req(int64_t lsn); nuraft_mesg::repl_service_ctx* group_msg_service(); + void iterate_repl_reqs(std::function< void(int64_t, repl_req_ptr_t rreq) > const& cb); + std::string rdev_name() const; private: diff --git a/src/lib/replication/service/raft_repl_service.cpp b/src/lib/replication/service/raft_repl_service.cpp index 9e08cabd0..a8291d8d1 100644 --- a/src/lib/replication/service/raft_repl_service.cpp +++ b/src/lib/replication/service/raft_repl_service.cpp @@ -335,7 +335,10 @@ void RaftReplService::start_reaper_thread() { // Schedule the rdev garbage collector timer m_rdev_gc_timer_hdl = iomanager.schedule_thread_timer( HS_DYNAMIC_CONFIG(generic.repl_dev_cleanup_interval_sec) * 1000 * 1000 * 1000, true /* recurring */, - nullptr, [this](void*) { gc_repl_devs(); }); + nullptr, [this](void*) { + gc_repl_reqs(); + gc_repl_devs(); + }); // Check for queued fetches at the minimum every second uint64_t interval_ns = @@ -352,9 +355,7 @@ void RaftReplService::start_reaper_thread() { p.setValue(); } else { // Cancel all recurring timers started -#if 0 iomanager.cancel_timer(m_rdev_gc_timer_hdl, true /* wait */); -#endif iomanager.cancel_timer(m_rdev_fetch_timer_hdl, true /* wait */); iomanager.cancel_timer(m_flush_durable_commit_timer_hdl, true /* wait */); } @@ -388,6 +389,14 @@ void RaftReplService::fetch_pending_data() { } } +void RaftReplService::gc_repl_reqs() { + std::shared_lock lg(m_rd_map_mtx); + for (auto it = m_rd_map.begin(); it != m_rd_map.end(); ++it) { + auto rdev = std::dynamic_pointer_cast< RaftReplDev >(it->second); + rdev->gc_repl_reqs(); + } +} + void RaftReplService::gc_repl_devs() { std::unique_lock lg(m_rd_map_mtx); for (auto it = m_rd_map.begin(); it != m_rd_map.end();) { diff --git a/src/lib/replication/service/raft_repl_service.h b/src/lib/replication/service/raft_repl_service.h index 1de44d848..4553adb59 100644 --- a/src/lib/replication/service/raft_repl_service.h +++ b/src/lib/replication/service/raft_repl_service.h @@ -79,7 +79,9 @@ class RaftReplService : public GenericReplService, void stop_reaper_thread(); void fetch_pending_data(); void gc_repl_devs(); + void gc_repl_reqs(); void flush_durable_commit_lsn(); + }; class RaftReplServiceCPHandler : public CPCallbacks { diff --git a/src/tests/test_raft_repl_dev.cpp b/src/tests/test_raft_repl_dev.cpp index 32df7ee0d..6a791cbdf 100644 --- a/src/tests/test_raft_repl_dev.cpp +++ b/src/tests/test_raft_repl_dev.cpp @@ -660,6 +660,50 @@ TEST_F(RaftReplDevTest, RemoveReplDev) { g_helper->sync_for_cleanup_start(); } +#ifdef _PRERELEASE +// Garbage collect the replication requests +// 0. Simulate data push is dropped so that fetch data can be triggered (if both data and raft channel received, we +// won't have timeout rreqs). +TEST_F(RaftReplDevTest, GCReplReqs) { + LOGINFO("Homestore replica={} setup completed", g_helper->replica_num()); + g_helper->sync_for_test_start(); + + uint32_t prev_timeout_sec{0}; + LOGINFO("Set the repl_req_timout_sec to be fairly small to force GC to kick in"); + HS_SETTINGS_FACTORY().modifiable_settings([&prev_timeout_sec](auto& s) { + prev_timeout_sec = s.consensus.repl_req_timeout_sec; + s.consensus.repl_req_timeout_sec = 5; + }); + HS_SETTINGS_FACTORY().save(); + + if (g_helper->replica_num() != 0) { + LOGINFO("Set flip to fake fetch data request on data channel"); + set_basic_flip("drop_push_data_request"); + } + + this->write_on_leader(100 /* num_entries */, true /* wait_for_commit */); + + // Step 2: Restart replica-0 (Leader) + this->restart_replica(0, 10); + + LOGINFO("After original leader is shutdown, insert more entries into the new leader"); + this->write_on_leader(100, true /* wait for commit on all replicas */); + + g_helper->sync_for_verify_start(); + + LOGINFO("Validate all data written so far by reading them"); + this->validate_data(); + + // step-5: Set the settings back and save. This is needed (if we ever give a --config in the test) + LOGINFO("Set the repl_req_timeout back to previous value={}", prev_timeout_sec); + HS_SETTINGS_FACTORY().modifiable_settings( + [prev_timeout_sec](auto& s) { s.consensus.repl_req_timeout_sec = prev_timeout_sec; }); + HS_SETTINGS_FACTORY().save(); + + g_helper->sync_for_cleanup_start(); +} +#endif + int main(int argc, char* argv[]) { int parsed_argc = argc; char** orig_argv = argv;