From 1ce9bbdd5fd98b69764cee17b7f019fefb3c153d Mon Sep 17 00:00:00 2001 From: Xiaoxi Chen Date: Thu, 7 Nov 2024 12:01:22 +0800 Subject: [PATCH] Releasing data buf from memory after written to disk. Data buffer persists in memory until rreq is committed or rolled back. This approach poses issues during recovery. As new data arrives via push_data and is written to disk, it remains in memory for an extended period until the replica catches up and commits the rreq. Signed-off-by: Xiaoxi Chen --- conanfile.py | 2 +- src/include/homestore/replication/repl_dev.h | 6 +++++- src/lib/replication/repl_dev/common.cpp | 9 ++++++++- src/lib/replication/repl_dev/raft_repl_dev.cpp | 2 ++ 4 files changed, 16 insertions(+), 3 deletions(-) diff --git a/conanfile.py b/conanfile.py index 06e091ba0..6baa5e900 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "6.5.7" + version = "6.5.8" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/include/homestore/replication/repl_dev.h b/src/include/homestore/replication/repl_dev.h index 1abf5ea12..cf0e00a0c 100644 --- a/src/include/homestore/replication/repl_dev.h +++ b/src/include/homestore/replication/repl_dev.h @@ -143,7 +143,10 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost:: sisl::blob const& key() const { return m_key; } MultiBlkId const& local_blkid() const { return m_local_blkid; } RemoteBlkId const& remote_blkid() const { return m_remote_blkid; } - const char* data() const { return r_cast< const char* >(m_data); } + const char* data() const { + DEBUG_ASSERT(m_data != nullptr, "m_data is nullptr, use before save_pushed/fetched_data or after release_data()"); + return r_cast< const char* >(m_data); + } repl_req_state_t state() const { return repl_req_state_t(m_state.load()); } bool has_state(repl_req_state_t s) const { return m_state.load() & uint32_cast(s); } repl_journal_entry const* journal_entry() const { return m_journal_entry; } @@ -209,6 +212,7 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost:: bool add_state_if_not_already(repl_req_state_t s); void set_lentry(nuraft::ptr< nuraft::log_entry > const& lentry) { m_lentry = lentry; } void clear(); + void release_data(); flatbuffers::FlatBufferBuilder& create_fb_builder() { return m_fb_builder; } void release_fb_builder() { m_fb_builder.Release(); } diff --git a/src/lib/replication/repl_dev/common.cpp b/src/lib/replication/repl_dev/common.cpp index b8800afea..4fcbb0f4e 100644 --- a/src/lib/replication/repl_dev/common.cpp +++ b/src/lib/replication/repl_dev/common.cpp @@ -164,12 +164,19 @@ bool repl_req_ctx::add_state_if_not_already(repl_req_state_t s) { void repl_req_ctx::clear() { m_header = sisl::blob{}; m_key = sisl::blob{}; + release_data(); + m_pkts.clear(); +} + +void repl_req_ctx::release_data() { + m_data = nullptr; + // explicitly clear m_buf_for_unaligned_data as unaligned pushdata/fetchdata will be saved here + m_buf_for_unaligned_data = sisl::io_blob_safe{}; if (m_pushed_data) { m_pushed_data->send_response(); m_pushed_data = nullptr; } m_fetched_data = sisl::GenericClientResponse{}; - m_pkts.clear(); } static std::string req_state_name(uint32_t state) { diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index b1ff61dbb..59916d039 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -444,6 +444,7 @@ void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_d } else { rreq->add_state(repl_req_state_t::DATA_WRITTEN); rreq->m_data_written_promise.setValue(); + rreq->release_data(); const auto data_log_diff_us = push_data_rcv_time.time_since_epoch().count() > rreq->created_time().time_since_epoch().count() ? get_elapsed_time_us(rreq->created_time(), push_data_rcv_time) @@ -862,6 +863,7 @@ void RaftReplDev::handle_fetch_data_response(sisl::GenericClientResponse respons "Error in writing data"); // TODO: Find a way to return error to the Listener rreq->add_state(repl_req_state_t::DATA_WRITTEN); rreq->m_data_written_promise.setValue(); + rreq->release_data(); RD_LOGD("Data Channel: Data Write completed rreq=[{}], data_write_latency_us={}, " "total_write_latency_us={}, write_num_pieces={}",