Skip to content

Commit

Permalink
Inlined data (#521)
Browse files Browse the repository at this point in the history
* Allocate blks only for linked data on log found

* implement is_resync_mode method
  • Loading branch information
raakella1 authored Aug 24, 2024
1 parent 3030851 commit 0691a36
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 26 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.4.50"
version = "6.4.51"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
3 changes: 3 additions & 0 deletions src/lib/common/homestore_config.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,9 @@ table Consensus {

// Frequency to flush durable commit LSN in millis
flush_durable_commit_interval_ms: uint64 = 500;

// Log difference to determine if the follower is in resync mode
resync_log_idx_threshold: int64 = 100;
}

table HomeStoreSettings {
Expand Down
63 changes: 40 additions & 23 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_d
}

if (!rreq->save_pushed_data(rpc_data, incoming_buf.cbytes() + fb_size, push_req->data_size())) {
RD_LOGD("Data Channel: Data already received for rreq=[{}], ignoring this data", rreq->to_compact_string());
RD_LOGD("Data Channel: Data already received for rreq=[{}], ignoring this data", rreq->to_string());
return;
}

Expand Down Expand Up @@ -328,7 +328,7 @@ void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_d
RD_LOGD("Data Channel: Data write completed for rreq=[{}], time_diff_data_log_us={}, "
"data_write_latency_us={}, total_data_write_latency_us(rreq creation to write complete)={}, "
"local_blkid.num_pieces={}",
rreq->to_compact_string(), data_log_diff_us, data_write_latency, total_data_write_latency,
rreq->to_string(), data_log_diff_us, data_write_latency, total_data_write_latency,
write_num_pieces);
}
});
Expand Down Expand Up @@ -385,8 +385,7 @@ repl_req_ptr_t RaftReplDev::applier_create_req(repl_key const& rkey, journal_typ
return nullptr;
}

RD_LOGD("in follower_create_req: rreq={}, addr={}", rreq->to_compact_string(),
reinterpret_cast< uintptr_t >(rreq.get()));
RD_LOGD("in follower_create_req: rreq={}, addr={}", rreq->to_string(), reinterpret_cast< uintptr_t >(rreq.get()));
return rreq;
}

Expand All @@ -400,7 +399,7 @@ folly::Future< folly::Unit > RaftReplDev::notify_after_data_written(std::vector<
if (!rreq->has_linked_data()) { continue; }
auto const status = uint32_cast(rreq->state());
if (status & uint32_cast(repl_req_state_t::DATA_WRITTEN)) {
RD_LOGD("Raft Channel: Data write completed and blkid mapped: rreq=[{}]", rreq->to_compact_string());
RD_LOGD("Raft Channel: Data write completed and blkid mapped: rreq=[{}]", rreq->to_string());
continue;
}

Expand Down Expand Up @@ -443,7 +442,7 @@ folly::Future< folly::Unit > RaftReplDev::notify_after_data_written(std::vector<
HS_DBG_ASSERT(rreq->has_state(repl_req_state_t::DATA_WRITTEN),
"Data written promise raised without updating DATA_WRITTEN state for rkey={}",
rreq->rkey().to_string());
RD_LOGD("Raft Channel: Data write completed and blkid mapped: rreq=[{}]", rreq->to_compact_string());
RD_LOGD("Raft Channel: Data write completed and blkid mapped: rreq=[{}]", rreq->to_string());
}
#endif
RD_LOGT("Data Channel: {} pending reqs's data are written", rreqs->size());
Expand Down Expand Up @@ -498,11 +497,11 @@ void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t > rreq
auto const cur_state = uint32_cast(rreq->state());
if (cur_state == uint32_cast(repl_req_state_t::ERRORED)) {
// We already received the data before, just ignore this data
RD_LOGD("Raft Channel: rreq=[{}] already errored out, ignoring the fetch", rreq->to_compact_string());
RD_LOGD("Raft Channel: rreq=[{}] already errored out, ignoring the fetch", rreq->to_string());
continue;
} else if (cur_state == uint32_cast(repl_req_state_t::DATA_RECEIVED)) {
// We already received the data before, just ignore this data
RD_LOGD("Raft Channel: Data already received for rreq=[{}], ignoring the fetch", rreq->to_compact_string());
RD_LOGD("Raft Channel: Data already received for rreq=[{}], ignoring the fetch", rreq->to_string());
continue;
}

Expand All @@ -526,7 +525,7 @@ void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t > rreq
void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) {
if (rreqs.size() == 0) { return; }

std::vector< ::flatbuffers::Offset< RequestEntry > > entries;
std::vector<::flatbuffers::Offset< RequestEntry > > entries;
entries.reserve(rreqs.size());

shared< flatbuffers::FlatBufferBuilder > builder = std::make_shared< flatbuffers::FlatBufferBuilder >();
Expand All @@ -544,10 +543,10 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) {
// but if there were to be such case, we need to group rreqs by originator and send them in separate
// batches;
RD_DBG_ASSERT_EQ(rreq->remote_blkid().server_id, originator, "Unexpected originator for rreq={}",
rreq->to_compact_string());
rreq->to_string());

RD_LOGT("Fetching data from originator={}, remote: rreq=[{}], remote_blkid={}, my server_id={}", originator,
rreq->to_compact_string(), rreq->remote_blkid().blkid.to_string(), server_id());
rreq->to_string(), rreq->remote_blkid().blkid.to_string(), server_id());
}

builder->FinishSizePrefixed(
Expand All @@ -571,7 +570,7 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) {
auto const fetch_latency_us = get_elapsed_time_us(fetch_start_time);
HISTOGRAM_OBSERVE(m_metrics, rreq_data_fetch_latency_us, fetch_latency_us);

RD_LOGD("Data Channel: FetchData from remote completed, time taken={} ms", fetch_latency_us);
RD_LOGD("Data Channel: FetchData from remote completed, time taken={} us", fetch_latency_us);

if (!response) {
// if we are here, it means the original who sent the log entries are down.
Expand Down Expand Up @@ -703,13 +702,13 @@ void RaftReplDev::handle_fetch_data_response(sisl::GenericClientResponse respons
auto const data_size = rreq->remote_blkid().blkid.blk_count() * get_blk_size();

if (!rreq->save_fetched_data(response, raw_data, data_size)) {
RD_DBG_ASSERT(rreq->local_blkid().is_valid(), "Invalid blkid for rreq={}", rreq->to_compact_string());
RD_DBG_ASSERT(rreq->local_blkid().is_valid(), "Invalid blkid for rreq={}", rreq->to_string());
auto const local_size = rreq->local_blkid().blk_count() * get_blk_size();
RD_DBG_ASSERT_EQ(data_size, local_size, "Data size mismatch for rreq={} remote size: {}, local size: {}",
rreq->to_compact_string(), data_size, local_size);
rreq->to_string(), data_size, local_size);

RD_LOGD("Data Channel: Data already received for rreq=[{}], skip and move on to next rreq.",
rreq->to_compact_string());
rreq->to_string());
} else {
auto const data_write_start_time = Clock::now();
COUNTER_INCREMENT(m_metrics, total_write_cnt, 1);
Expand All @@ -734,11 +733,11 @@ void RaftReplDev::handle_fetch_data_response(sisl::GenericClientResponse respons

RD_LOGD("Data Channel: Data Write completed rreq=[{}], data_write_latency_us={}, "
"total_write_latency_us={}, write_num_pieces={}",
rreq->to_compact_string(), data_write_latency, total_data_write_latency, write_num_pieces);
rreq->to_string(), data_write_latency, total_data_write_latency, write_num_pieces);
});

RD_LOGD("Data Channel: Data fetched from remote: rreq=[{}], data_size: {}, total_size: {}, local_blkid: {}",
rreq->to_compact_string(), data_size, total_size, rreq->local_blkid().to_string());
rreq->to_string(), data_size, total_size, rreq->local_blkid().to_string());
}
raw_data += data_size;
total_size -= data_size;
Expand Down Expand Up @@ -796,7 +795,7 @@ void RaftReplDev::handle_error(repl_req_ptr_t const& rreq, ReplServiceError err)

if (rreq->op_code() == journal_type_t::HS_DATA_INLINED) {
// Free the blks which is allocated already
RD_LOGE("Raft Channel: Error in processing rreq=[{}] error={}", rreq->to_compact_string(), err);
RD_LOGE("Raft Channel: Error in processing rreq=[{}] error={}", rreq->to_string(), err);
if (rreq->has_state(repl_req_state_t::BLK_ALLOCATED)) {
auto blkid = rreq->local_blkid();
data_service().async_free_blk(blkid).thenValue([blkid](auto&& err) {
Expand Down Expand Up @@ -1031,6 +1030,12 @@ std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nu
auto raft_req = r_cast< nuraft::req_msg* >(param->ctx);
auto const& entries = raft_req->log_entries();

auto start_lsn = raft_req->get_last_log_idx() + 1;
RD_LOGD("Raft channel: Received {} append entries on follower from leader, term {}, lsn {} ~ {} , my commited "
"lsn {} , leader commmited lsn {}",
entries.size(), raft_req->get_last_log_term(), start_lsn, start_lsn + entries.size() - 1,
m_commit_upto_lsn.load(), raft_req->get_commit_idx());

if (!entries.empty()) {
RD_LOGT("Raft channel: Received {} append entries on follower from leader, localizing them",
entries.size());
Expand Down Expand Up @@ -1166,12 +1171,17 @@ void RaftReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx
RD_DBG_ASSERT((it != m_repl_key_req_map.end()), "Unexpected error in map_repl_key_to_req");
auto rreq = it->second;
RD_DBG_ASSERT(happened, "rreq already exists for rkey={}", rkey.to_string());
MultiBlkId entry_blkid;
entry_blkid.deserialize(entry_to_val(jentry), true /* copy */);
rreq->init(rkey, jentry->code, false /* is_proposer */, entry_to_hdr(jentry), entry_to_key(jentry),
(entry_blkid.blk_count() * get_blk_size()));
rreq->set_local_blkid(entry_blkid);
uint32_t data_size{0u};

if ((jentry->code == journal_type_t::HS_DATA_LINKED) && (jentry->value_size > 0)) {
MultiBlkId entry_blkid;
entry_blkid.deserialize(entry_to_val(jentry), true /* copy */);
data_size = entry_blkid.blk_count() * get_blk_size();
rreq->set_local_blkid(entry_blkid);
}

rreq->set_lsn(repl_lsn);
rreq->init(rkey, jentry->code, false /* is_proposer */, entry_to_hdr(jentry), entry_to_key(jentry), data_size);
RD_LOGD("Replay log on restart, rreq=[{}]", rreq->to_string());

if (repl_lsn > m_rd_sb->durable_commit_lsn) {
Expand All @@ -1188,4 +1198,11 @@ void RaftReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx

void RaftReplDev::on_restart() { m_listener->on_restart(); }

bool RaftReplDev::is_resync_mode() {
int64_t const leader_commited_lsn = raft_server()->get_leader_committed_log_idx();
int64_t const my_log_idx = raft_server()->get_last_log_idx();
auto diff = leader_commited_lsn - my_log_idx;
return diff > HS_DYNAMIC_CONFIG(consensus.resync_log_idx_threshold);
}

} // namespace homestore
3 changes: 1 addition & 2 deletions src/lib/replication/repl_dev/raft_repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ class RaftReplDev : public ReplDev,

iomgr::timer_handle_t m_wait_data_timer_hdl{
iomgr::null_timer_handle}; // non-recurring timer doesn't need to be cancelled on shutdown;
bool m_resync_mode{false};
Clock::time_point m_destroyed_time;
folly::Promise< ReplServiceError > m_destroy_promise;
RaftReplDevMetrics m_metrics;
Expand Down Expand Up @@ -255,7 +254,7 @@ class RaftReplDev : public ReplDev,
void on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_data);
void fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs);
void handle_fetch_data_response(sisl::GenericClientResponse response, std::vector< repl_req_ptr_t > rreqs);
bool is_resync_mode() { return m_resync_mode; }
bool is_resync_mode();
void handle_error(repl_req_ptr_t const& rreq, ReplServiceError err);
bool wait_for_data_receive(std::vector< repl_req_ptr_t > const& rreqs, uint64_t timeout_ms);
void on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx);
Expand Down

0 comments on commit 0691a36

Please sign in to comment.