Skip to content

Commit

Permalink
Releasing data buf from memory after written to disk.
Browse files Browse the repository at this point in the history
Data buffer persists in memory until rreq is committed or rolled back.

This approach poses issues during recovery. As new data arrives via
push_data and is written to disk, it remains in memory for an extended
period until the replica catches up and commits the rreq.

Signed-off-by: Xiaoxi Chen <[email protected]>
  • Loading branch information
xiaoxichen committed Nov 7, 2024
1 parent 50f42ff commit 1ce9bbd
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 3 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.5.7"
version = "6.5.8"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
6 changes: 5 additions & 1 deletion src/include/homestore/replication/repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,10 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost::
sisl::blob const& key() const { return m_key; }
MultiBlkId const& local_blkid() const { return m_local_blkid; }
RemoteBlkId const& remote_blkid() const { return m_remote_blkid; }
const char* data() const { return r_cast< const char* >(m_data); }
const char* data() const {
DEBUG_ASSERT(m_data != nullptr, "m_data is nullptr, use before save_pushed/fetched_data or after release_data()");
return r_cast< const char* >(m_data);
}
repl_req_state_t state() const { return repl_req_state_t(m_state.load()); }
bool has_state(repl_req_state_t s) const { return m_state.load() & uint32_cast(s); }
repl_journal_entry const* journal_entry() const { return m_journal_entry; }
Expand Down Expand Up @@ -209,6 +212,7 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost::
bool add_state_if_not_already(repl_req_state_t s);
void set_lentry(nuraft::ptr< nuraft::log_entry > const& lentry) { m_lentry = lentry; }
void clear();
void release_data();
flatbuffers::FlatBufferBuilder& create_fb_builder() { return m_fb_builder; }
void release_fb_builder() { m_fb_builder.Release(); }

Expand Down
9 changes: 8 additions & 1 deletion src/lib/replication/repl_dev/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,19 @@ bool repl_req_ctx::add_state_if_not_already(repl_req_state_t s) {
void repl_req_ctx::clear() {
m_header = sisl::blob{};
m_key = sisl::blob{};
release_data();
m_pkts.clear();
}

void repl_req_ctx::release_data() {
m_data = nullptr;
// explicitly clear m_buf_for_unaligned_data as unaligned pushdata/fetchdata will be saved here
m_buf_for_unaligned_data = sisl::io_blob_safe{};
if (m_pushed_data) {
m_pushed_data->send_response();
m_pushed_data = nullptr;
}
m_fetched_data = sisl::GenericClientResponse{};
m_pkts.clear();
}

static std::string req_state_name(uint32_t state) {
Expand Down
2 changes: 2 additions & 0 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@ void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_d
} else {
rreq->add_state(repl_req_state_t::DATA_WRITTEN);
rreq->m_data_written_promise.setValue();
rreq->release_data();
const auto data_log_diff_us =
push_data_rcv_time.time_since_epoch().count() > rreq->created_time().time_since_epoch().count()
? get_elapsed_time_us(rreq->created_time(), push_data_rcv_time)
Expand Down Expand Up @@ -862,6 +863,7 @@ void RaftReplDev::handle_fetch_data_response(sisl::GenericClientResponse respons
"Error in writing data"); // TODO: Find a way to return error to the Listener
rreq->add_state(repl_req_state_t::DATA_WRITTEN);
rreq->m_data_written_promise.setValue();
rreq->release_data();

RD_LOGD("Data Channel: Data Write completed rreq=[{}], data_write_latency_us={}, "
"total_write_latency_us={}, write_num_pieces={}",
Expand Down

0 comments on commit 1ce9bbd

Please sign in to comment.