diff --git a/conanfile.py b/conanfile.py index 870760692..5029beeca 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "6.4.52" + version = "6.4.53" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/include/homestore/replication/repl_dev.h b/src/include/homestore/replication/repl_dev.h index 5d1a0e8c2..7de51793e 100644 --- a/src/include/homestore/replication/repl_dev.h +++ b/src/include/homestore/replication/repl_dev.h @@ -18,6 +18,7 @@ template < typename T > using ptr = std::shared_ptr< T >; class buffer; +class log_entry; } // namespace nuraft namespace homestore { @@ -149,7 +150,6 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost:: raft_buf_ptr_t& raft_journal_buf(); uint8_t* raw_journal_buf(); - /////////////////////// Non modifiers methods ////////////////// std::string to_string() const; std::string to_compact_string() const; @@ -203,6 +203,7 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost:: void set_lsn(int64_t lsn); void add_state(repl_req_state_t s); bool add_state_if_not_already(repl_req_state_t s); + void set_lentry(nuraft::ptr< nuraft::log_entry > const& lentry) { m_lentry = lentry; } void clear(); flatbuffers::FlatBufferBuilder& create_fb_builder() { return m_fb_builder; } void release_fb_builder() { m_fb_builder.Release(); } @@ -234,6 +235,7 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost:: std::variant< std::unique_ptr< uint8_t[] >, raft_buf_ptr_t > m_journal_buf; // Buf for the journal entry repl_journal_entry* m_journal_entry{nullptr}; // pointer to the journal entry bool m_is_jentry_localize_pending{false}; // Is the journal entry needs to be localized from remote + nuraft::ptr< nuraft::log_entry > m_lentry; /////////////// Replication state related section ///////////////// std::atomic< uint32_t > m_state{uint32_cast(repl_req_state_t::INIT)}; // State of the replication request diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index bad06c16b..5d1056ff4 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -77,11 +77,12 @@ RaftReplDev::RaftReplDev(RaftReplService& svc, superblk< raft_repl_dev_superblk } RD_LOG(INFO, - "Started {} RaftReplDev group_id={}, replica_id={}, raft_server_id={} commited_lsn={}, compact_lsn={} " - "next_dsn={} " + "Started {} RaftReplDev group_id={}, replica_id={}, raft_server_id={} commited_lsn={}, " + "compact_lsn={}, checkpoint_lsn:{}, next_dsn={} " "log_dev={} log_store={}", (load_existing ? "Existing" : "New"), group_id_str(), my_replica_id_str(), m_raft_server_id, - m_commit_upto_lsn.load(), m_compact_lsn.load(), m_next_dsn.load(), m_rd_sb->logdev_id, m_rd_sb->logstore_id); + m_commit_upto_lsn.load(), m_compact_lsn.load(), m_rd_sb->checkpoint_lsn, m_next_dsn.load(), + m_rd_sb->logdev_id, m_rd_sb->logstore_id); #ifdef _PRERELEASE m_msg_mgr.bind_data_service_request(PUSH_DATA, m_group_id, [this](intrusive< sisl::GenericRpcData >& rpc_data) { @@ -746,7 +747,7 @@ void RaftReplDev::handle_fetch_data_response(sisl::GenericClientResponse respons RD_DBG_ASSERT_EQ(total_size, 0, "Total size mismatch, some data is not consumed"); } -void RaftReplDev::handle_commit(repl_req_ptr_t rreq, bool recovery) { +void RaftReplDev::commit_blk(repl_req_ptr_t rreq) { if (rreq->local_blkid().is_valid()) { if (data_service().commit_blk(rreq->local_blkid()) != BlkAllocStatus::SUCCESS) { if (hs()->device_mgr()->is_boot_in_degraded_mode() && m_log_store_replay_done) @@ -755,6 +756,10 @@ void RaftReplDev::handle_commit(repl_req_ptr_t rreq, bool recovery) { RD_DBG_ASSERT(false, "fail to commit blk when applying log in non-degraded mode.") } } +} + +void RaftReplDev::handle_commit(repl_req_ptr_t rreq, bool recovery) { + commit_blk(rreq); // Remove the request from repl_key map. m_repl_key_req_map.erase(rreq->rkey()); @@ -1181,10 +1186,14 @@ void RaftReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx } rreq->set_lsn(repl_lsn); + // keep lentry in scope for the lyfe cycle of the rreq + rreq->set_lentry(lentry); rreq->init(rkey, jentry->code, false /* is_proposer */, entry_to_hdr(jentry), entry_to_key(jentry), data_size); RD_LOGD("Replay log on restart, rreq=[{}]", rreq->to_string()); if (repl_lsn > m_rd_sb->durable_commit_lsn) { + // In memory state of these blks is lost. Commit them now to avoid usage of same blk twice. + commit_blk(rreq); m_state_machine->link_lsn_to_req(rreq, int64_cast(repl_lsn)); return; } @@ -1202,7 +1211,12 @@ bool RaftReplDev::is_resync_mode() { int64_t const leader_commited_lsn = raft_server()->get_leader_committed_log_idx(); int64_t const my_log_idx = raft_server()->get_last_log_idx(); auto diff = leader_commited_lsn - my_log_idx; - return diff > HS_DYNAMIC_CONFIG(consensus.resync_log_idx_threshold); + bool resync_mode = (diff > HS_DYNAMIC_CONFIG(consensus.resync_log_idx_threshold)); + if (resync_mode) { + RD_LOGD("Raft Channel: Resync mode, leader_commited_lsn={}, my_log_idx={}, diff={}", leader_commited_lsn, + my_log_idx, diff); + } + return resync_mode; } } // namespace homestore diff --git a/src/lib/replication/repl_dev/raft_repl_dev.h b/src/lib/replication/repl_dev/raft_repl_dev.h index a68e3f578..3add630cf 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.h +++ b/src/lib/replication/repl_dev/raft_repl_dev.h @@ -258,6 +258,7 @@ class RaftReplDev : public ReplDev, void handle_error(repl_req_ptr_t const& rreq, ReplServiceError err); bool wait_for_data_receive(std::vector< repl_req_ptr_t > const& rreqs, uint64_t timeout_ms); void on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx); + void commit_blk(repl_req_ptr_t rreq); }; } // namespace homestore