From da8e8df287b0c2d333d3649e1428ccfe3505a966 Mon Sep 17 00:00:00 2001 From: Ravi Nagarjun Akella Date: Thu, 15 Aug 2024 12:47:39 -0700 Subject: [PATCH] convert log store lsn to repl lsn in repl_dev --- conanfile.py | 2 +- .../log_store/home_raft_log_store.cpp | 1 - .../log_store/home_raft_log_store.h | 2 ++ src/lib/replication/repl_dev/common.cpp | 1 + .../replication/repl_dev/raft_repl_dev.cpp | 34 +++++++++++-------- .../repl_dev/raft_state_machine.cpp | 4 +-- 6 files changed, 26 insertions(+), 18 deletions(-) diff --git a/conanfile.py b/conanfile.py index 701ddb96c..09124b55b 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "6.4.46" + version = "6.4.47" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/lib/replication/log_store/home_raft_log_store.cpp b/src/lib/replication/log_store/home_raft_log_store.cpp index 85f55d7dc..6a589dc84 100644 --- a/src/lib/replication/log_store/home_raft_log_store.cpp +++ b/src/lib/replication/log_store/home_raft_log_store.cpp @@ -43,7 +43,6 @@ static constexpr logstore_seq_num_t to_store_lsn(uint64_t raft_lsn) { static constexpr logstore_seq_num_t to_store_lsn(repl_lsn_t repl_lsn) { return static_cast< logstore_seq_num_t >(repl_lsn - 1); } -static constexpr repl_lsn_t to_repl_lsn(store_lsn_t store_lsn) { return store_lsn + 1; } static uint64_t extract_term(const log_buffer& log_bytes) { uint8_t const* raw_ptr = log_bytes.bytes(); diff --git a/src/lib/replication/log_store/home_raft_log_store.h b/src/lib/replication/log_store/home_raft_log_store.h index d39842af5..da4137e8f 100644 --- a/src/lib/replication/log_store/home_raft_log_store.h +++ b/src/lib/replication/log_store/home_raft_log_store.h @@ -223,4 +223,6 @@ static nuraft::ptr< nuraft::log_entry > to_nuraft_log_entry(sisl::blob const& lo static nuraft::ptr< nuraft::log_entry > to_nuraft_log_entry(const log_buffer& log_bytes) { return to_nuraft_log_entry(log_bytes.get_blob()); } + +static constexpr repl_lsn_t to_repl_lsn(store_lsn_t store_lsn) { return store_lsn + 1; } } // namespace homestore diff --git a/src/lib/replication/repl_dev/common.cpp b/src/lib/replication/repl_dev/common.cpp index 612941834..47947cd7c 100644 --- a/src/lib/replication/repl_dev/common.cpp +++ b/src/lib/replication/repl_dev/common.cpp @@ -108,6 +108,7 @@ void repl_req_ctx::set_lsn(int64_t lsn) { DEBUG_ASSERT((m_lsn == -1) || (m_lsn == lsn), "Changing lsn for request={} on the fly can cause race condition, not expected", to_string()); m_lsn = lsn; + LOGTRACEMOD(replication, "Setting lsn={} for request={}", lsn, to_string()); } bool repl_req_ctx::save_pushed_data(intrusive< sisl::GenericRpcData > const& pushed_data, uint8_t const* data, diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index 30f13b3cd..7a79ad8a5 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -77,10 +77,11 @@ RaftReplDev::RaftReplDev(RaftReplService& svc, superblk< raft_repl_dev_superblk } RD_LOG(INFO, - "Started {} RaftReplDev group_id={}, replica_id={}, raft_server_id={} commited_lsn={} next_dsn={} " + "Started {} RaftReplDev group_id={}, replica_id={}, raft_server_id={} commited_lsn={}, compact_lsn={} " + "next_dsn={} " "log_dev={} log_store={}", (load_existing ? "Existing" : "New"), group_id_str(), my_replica_id_str(), m_raft_server_id, - m_commit_upto_lsn.load(), m_next_dsn.load(), m_rd_sb->logdev_id, m_rd_sb->logstore_id); + m_commit_upto_lsn.load(), m_compact_lsn.load(), m_next_dsn.load(), m_rd_sb->logdev_id, m_rd_sb->logstore_id); #ifdef _PRERELEASE m_msg_mgr.bind_data_service_request(PUSH_DATA, m_group_id, [this](intrusive< sisl::GenericRpcData >& rpc_data) { @@ -230,20 +231,20 @@ void RaftReplDev::push_data_to_all_followers(repl_req_ptr_t rreq, sisl::sg_list flatbuffers::FlatBufferToString(builder.GetBufferPointer() + sizeof(flatbuffers::uoffset_t), PushDataRequestTypeTable()));*/ - RD_LOGD("Data Channel: Pushing data to all followers: rreq=[{}]", rreq->to_compact_string()); + RD_LOGD("Data Channel: Pushing data to all followers: rreq=[{}]", rreq->to_string()); group_msg_service() ->data_service_request_unidirectional(nuraft_mesg::role_regex::ALL, PUSH_DATA, rreq->m_pkts) .via(&folly::InlineExecutor::instance()) .thenValue([this, rreq = std::move(rreq)](auto e) { if (e.hasError()) { - RD_LOGE("Data Channel: Error in pushing data to all followers: rreq=[{}] error={}", - rreq->to_compact_string(), e.error()); + RD_LOGE("Data Channel: Error in pushing data to all followers: rreq=[{}] error={}", rreq->to_string(), + e.error()); handle_error(rreq, RaftReplService::to_repl_error(e.error())); return; } // Release the buffer which holds the packets - RD_LOGD("Data Channel: Data push completed for rreq=[{}]", rreq->to_compact_string()); + RD_LOGD("Data Channel: Data push completed for rreq=[{}]", rreq->to_string()); rreq->release_fb_builder(); rreq->m_pkts.clear(); }); @@ -766,7 +767,7 @@ void RaftReplDev::handle_commit(repl_req_ptr_t rreq, bool recovery) { m_next_dsn.compare_exchange_strong(cur_dsn, rreq->dsn() + 1); } - RD_LOGD("Raft channel: Commit rreq=[{}]", rreq->to_compact_string()); + RD_LOGD("Raft channel: Commit rreq=[{}]", rreq->to_string()); if (rreq->op_code() == journal_type_t::HS_CTRL_DESTROY) { leave(); } else { @@ -775,7 +776,9 @@ void RaftReplDev::handle_commit(repl_req_ptr_t rreq, bool recovery) { if (!recovery) { auto prev_lsn = m_commit_upto_lsn.exchange(rreq->lsn()); - RD_DBG_ASSERT_GT(rreq->lsn(), prev_lsn, "Out of order commit of lsns, it is not expected in RaftReplDev"); + RD_DBG_ASSERT_GT(rreq->lsn(), prev_lsn, + "Out of order commit of lsns, it is not expected in RaftReplDev. cur_lsns={}, prev_lsns={}", + rreq->lsn(), prev_lsn); } if (!rreq->is_proposer()) { rreq->clear(); } } @@ -1097,7 +1100,8 @@ void RaftReplDev::gc_repl_reqs() { if (rreq->is_expired()) { expired_keys.push_back(key); - RD_LOGD("rreq=[{}] is expired, cleaning up", rreq->to_compact_string()); + RD_LOGD("rreq=[{}] is expired, cleaning up; elapsed_time_sec{};", rreq->to_string(), + get_elapsed_time_sec(rreq->created_time())); // do garbage collection // 1. free the allocated blocks @@ -1124,8 +1128,9 @@ void RaftReplDev::gc_repl_reqs() { } void RaftReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx) { + auto repl_lsn = to_repl_lsn(lsn); // apply the log entry if the lsn is between checkpoint lsn and durable commit lsn - if (lsn < m_rd_sb->checkpoint_lsn) { return; } + if (repl_lsn < m_rd_sb->checkpoint_lsn) { return; } // 1. Get the log entry and prepare rreq auto const lentry = to_nuraft_log_entry(buf); @@ -1166,15 +1171,16 @@ void RaftReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx rreq->init(rkey, jentry->code, false /* is_proposer */, entry_to_hdr(jentry), entry_to_key(jentry), (entry_blkid.blk_count() * get_blk_size())); rreq->set_local_blkid(entry_blkid); - rreq->set_lsn(lsn); + rreq->set_lsn(repl_lsn); + RD_LOGD("Replay log on restart, rreq=[{}]", rreq->to_string()); - if (lsn > m_rd_sb->durable_commit_lsn) { - m_state_machine->link_lsn_to_req(rreq, int64_cast(lsn)); + if (repl_lsn > m_rd_sb->durable_commit_lsn) { + m_state_machine->link_lsn_to_req(rreq, int64_cast(repl_lsn)); return; } // 2. Pre-commit the log entry - m_listener->on_pre_commit(lsn, entry_to_hdr(jentry), entry_to_key(jentry), nullptr); + m_listener->on_pre_commit(repl_lsn, entry_to_hdr(jentry), entry_to_key(jentry), nullptr); // 3. Commit the log entry handle_commit(rreq, true /* recovery */); diff --git a/src/lib/replication/repl_dev/raft_state_machine.cpp b/src/lib/replication/repl_dev/raft_state_machine.cpp index 8300fcce8..92ae90b46 100644 --- a/src/lib/replication/repl_dev/raft_state_machine.cpp +++ b/src/lib/replication/repl_dev/raft_state_machine.cpp @@ -189,8 +189,8 @@ raft_buf_ptr_t RaftStateMachine::commit_ext(nuraft::state_machine::ext_op_params RD_LOGD("Raft channel: Received Commit message lsn {} store {} logdev {} size {}", lsn, m_rd.m_data_journal->logstore_id(), m_rd.m_data_journal->logdev_id(), params.data->size()); repl_req_ptr_t rreq = lsn_to_req(lsn); - RD_DBG_ASSERT(rreq != nullptr, "Raft channel got null rreq"); - RD_LOGD("Raft channel: Received Commit message rreq=[{}]", rreq->to_compact_string()); + RD_DBG_ASSERT(rreq != nullptr, "Raft channel got null rreq for lsn={}", lsn); + RD_LOGD("Raft channel: Received Commit message rreq=[{}]", rreq->to_string()); if (rreq->is_proposer()) { // This is the time to ensure flushing of journal happens in the proposer rreq->add_state(repl_req_state_t::LOG_FLUSHED);