diff --git a/conanfile.py b/conanfile.py index ca96905c5..06e091ba0 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "6.5.6" + version = "6.5.7" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/include/homestore/replication/repl_dev.h b/src/include/homestore/replication/repl_dev.h index c2223455f..1abf5ea12 100644 --- a/src/include/homestore/replication/repl_dev.h +++ b/src/include/homestore/replication/repl_dev.h @@ -126,7 +126,7 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost:: friend class SoloReplDev; public: - repl_req_ctx() {} + repl_req_ctx() { m_start_time = Clock::now(); } virtual ~repl_req_ctx(); void init(repl_key rkey, journal_type_t op_code, bool is_proposer, sisl::blob const& user_header, sisl::blob const& key, uint32_t data_size); diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index f3a4a2461..b1ff61dbb 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -895,7 +895,7 @@ void RaftReplDev::handle_commit(repl_req_ptr_t rreq, bool recovery) { // Remove the request from repl_key map. m_repl_key_req_map.erase(rreq->rkey()); // Remove the request from lsn map. - m_state_machine->unlink_lsn_to_req(rreq->lsn()); + m_state_machine->unlink_lsn_to_req(rreq->lsn(), rreq); auto cur_dsn = m_next_dsn.load(std::memory_order_relaxed); while (cur_dsn <= rreq->dsn()) { @@ -1191,9 +1191,22 @@ std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nu entries.size()); auto reqs = sisl::VectorPool< repl_req_ptr_t >::alloc(); - for (auto& entry : entries) { + auto last_commit_lsn = uint64_cast(get_last_commit_lsn()); + for (unsigned long i = 0; i < entries.size(); i++) { + auto& entry = entries[i]; + auto lsn = start_lsn + i; + auto term = entry->get_term(); if (entry->get_val_type() != nuraft::log_val_type::app_log) { continue; } if (entry->get_buf_ptr()->size() == 0) { continue; } + // skipping localize for already committed log(dup), they anyway will be discard + // by nuraft before append_log. + if (lsn <= last_commit_lsn) { + RD_LOGT("Raft channel: term {}, lsn {}, skipping dup, last_commit_lsn {}", term, lsn, + last_commit_lsn); + continue; + } + // Those LSNs already in logstore but not yet committed, will be dedup here, + // applier_create_req will return same req as previous one auto req = m_state_machine->localize_journal_entry_prepare(*entry); if (req == nullptr) { sisl::VectorPool< repl_req_ptr_t >::free(reqs); @@ -1265,39 +1278,71 @@ cshared< ReplDevCPContext > RaftReplDev::get_cp_ctx(CP* cp) { void RaftReplDev::cp_cleanup(CP*) {} void RaftReplDev::gc_repl_reqs() { - std::vector< int64_t > expired_keys; - m_state_machine->iterate_repl_reqs([this, &expired_keys](auto key, auto rreq) { + auto cur_dsn = m_next_dsn.load(); + if (cur_dsn != 0) cur_dsn = cur_dsn - 1; + // On follower, DSN below cur_dsn should very likely be commited. + // It is not guaranteed because DSN and LSN are generated separately, + // DSN in async_alloc_write before pushing data, LSN later when + // proposing to raft. Two simultaneous write requests on leader can have + // and during the window. + std::vector< repl_req_ptr_t > expired_rreqs; + + auto req_map_size = m_repl_key_req_map.size(); + RD_LOGI("m_repl_key_req_map size is {};", req_map_size); + for (auto [key, rreq] : m_repl_key_req_map) { + // FIXME: Skipping proposer for now, the DSN in proposer increased in proposing stage, not when commit(). + // Need other mechanism. + if (rreq->is_proposer()) { + // don't clean up proposer's request + continue; + } + if (rreq->dsn() < cur_dsn && rreq->is_expired()) { + // The DSN can be out of order, wait till rreq expired. + RD_LOGD("legacy req with commited DSN, rreq=[{}] , dsn = {}, next_dsn = {}, gap= {}, elapsed_time_sec {}", + rreq->to_string(), rreq->dsn(), cur_dsn, cur_dsn - rreq->dsn(), + get_elapsed_time_sec(rreq->created_time())); + expired_rreqs.push_back(rreq); + } + } + int sm_req_cnt = 0; + // FIXME: we ensured data written before appending log to log store, in which we add rreq to state_machine + // and during pre-commit/commit we retrieve rreq from state_machine. Removing requests outside of state + // machine is risky. + // Below logs are logging only, can be removed once we get more confidence. + m_state_machine->iterate_repl_reqs([this, cur_dsn, &sm_req_cnt](auto key, auto rreq) { + sm_req_cnt++; if (rreq->is_proposer()) { // don't clean up proposer's request return; } - if (rreq->is_expired()) { - expired_keys.push_back(key); - RD_LOGD("rreq=[{}] is expired, cleaning up; elapsed_time_sec{};", rreq->to_string(), + RD_LOGD("StateMachine: rreq=[{}] is expired, elapsed_time_sec{};", rreq->to_string(), get_elapsed_time_sec(rreq->created_time())); - - // do garbage collection - // 1. free the allocated blocks - if (rreq->has_state(repl_req_state_t::BLK_ALLOCATED)) { - auto blkid = rreq->local_blkid(); - data_service().async_free_blk(blkid).thenValue([this, blkid](auto&& err) { - HS_LOG_ASSERT(!err, "freeing blkid={} upon error failed, potential to cause blk leak", - blkid.to_string()); - RD_LOGD("blkid={} freed successfully", blkid.to_string()); - }); - } - - // 2. remove from the m_repl_key_req_map - // handle_error during fetch data response might have already removed the rreq from the this map - if (m_repl_key_req_map.find(rreq->rkey()) != m_repl_key_req_map.end()) { - m_repl_key_req_map.erase(rreq->rkey()); - } } }); + RD_LOGI("state_machine req map size is {};", sm_req_cnt); - for (auto const& l : expired_keys) { - m_state_machine->unlink_lsn_to_req(l); + for (auto removing_rreq : expired_rreqs) { + // once log flushed, the commit progress controlled by raft + if (removing_rreq->has_state(repl_req_state_t::LOG_FLUSHED)) { + RD_LOGI("Skipping GC rreq [{}] because it is in state machine", removing_rreq->to_string()); + continue; + } + // do garbage collection + // 1. free the allocated blocks + RD_LOGI("Removing rreq [{}]", removing_rreq->to_string()); + if (removing_rreq->has_state(repl_req_state_t::BLK_ALLOCATED)) { + auto blkid = removing_rreq->local_blkid(); + data_service().async_free_blk(blkid).thenValue([this, blkid](auto&& err) { + HS_LOG_ASSERT(!err, "freeing blkid={} upon error failed, potential to cause blk leak", + blkid.to_string()); + RD_LOGD("GC rreq: Releasing blkid={} freed successfully", blkid.to_string()); + }); + } + // 2. remove from the m_repl_key_req_map + if (m_repl_key_req_map.find(removing_rreq->rkey()) != m_repl_key_req_map.end()) { + m_repl_key_req_map.erase(removing_rreq->rkey()); + } } } diff --git a/src/lib/replication/repl_dev/raft_state_machine.cpp b/src/lib/replication/repl_dev/raft_state_machine.cpp index d1b210526..ae8f2a193 100644 --- a/src/lib/replication/repl_dev/raft_state_machine.cpp +++ b/src/lib/replication/repl_dev/raft_state_machine.cpp @@ -221,11 +221,12 @@ uint64_t RaftStateMachine::last_commit_index() { void RaftStateMachine::become_ready() { m_rd.become_ready(); } -void RaftStateMachine::unlink_lsn_to_req(int64_t lsn) { - auto const it = m_lsn_req_map.find(lsn); - if (it != m_lsn_req_map.cend()) { - RD_LOG(DEBUG, "Raft channel: erase lsn {}, rreq {}", lsn, it->second->to_string()); - m_lsn_req_map.erase(lsn); +void RaftStateMachine::unlink_lsn_to_req(int64_t lsn, repl_req_ptr_t rreq) { + // it is possible a LSN mapped to different rreq in history + // due to log overwritten. Verify the rreq before removing + auto deleted = m_lsn_req_map.erase_if_equal(lsn, rreq); + if (deleted) { + RD_LOG(DEBUG, "Raft channel: erase lsn {}, rreq {}", lsn, rreq->to_string()); } } diff --git a/src/lib/replication/repl_dev/raft_state_machine.h b/src/lib/replication/repl_dev/raft_state_machine.h index b931e42f4..a19d9a0ec 100644 --- a/src/lib/replication/repl_dev/raft_state_machine.h +++ b/src/lib/replication/repl_dev/raft_state_machine.h @@ -126,7 +126,7 @@ class RaftStateMachine : public nuraft::state_machine { repl_req_ptr_t localize_journal_entry_prepare(nuraft::log_entry& lentry); repl_req_ptr_t localize_journal_entry_finish(nuraft::log_entry& lentry); void link_lsn_to_req(repl_req_ptr_t rreq, int64_t lsn); - void unlink_lsn_to_req(int64_t lsn); + void unlink_lsn_to_req(int64_t lsn, repl_req_ptr_t rreq); repl_req_ptr_t lsn_to_req(int64_t lsn); nuraft_mesg::repl_service_ctx* group_msg_service();