-
Notifications
You must be signed in to change notification settings - Fork 21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Inlined data #521
Inlined data #521
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -291,7 +291,7 @@ void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_d | |
} | ||
|
||
if (!rreq->save_pushed_data(rpc_data, incoming_buf.cbytes() + fb_size, push_req->data_size())) { | ||
RD_LOGD("Data Channel: Data already received for rreq=[{}], ignoring this data", rreq->to_compact_string()); | ||
RD_LOGD("Data Channel: Data already received for rreq=[{}], ignoring this data", rreq->to_string()); | ||
return; | ||
} | ||
|
||
|
@@ -328,7 +328,7 @@ void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_d | |
RD_LOGD("Data Channel: Data write completed for rreq=[{}], time_diff_data_log_us={}, " | ||
"data_write_latency_us={}, total_data_write_latency_us(rreq creation to write complete)={}, " | ||
"local_blkid.num_pieces={}", | ||
rreq->to_compact_string(), data_log_diff_us, data_write_latency, total_data_write_latency, | ||
rreq->to_string(), data_log_diff_us, data_write_latency, total_data_write_latency, | ||
write_num_pieces); | ||
} | ||
}); | ||
|
@@ -385,8 +385,7 @@ repl_req_ptr_t RaftReplDev::applier_create_req(repl_key const& rkey, journal_typ | |
return nullptr; | ||
} | ||
|
||
RD_LOGD("in follower_create_req: rreq={}, addr={}", rreq->to_compact_string(), | ||
reinterpret_cast< uintptr_t >(rreq.get())); | ||
RD_LOGD("in follower_create_req: rreq={}, addr={}", rreq->to_string(), reinterpret_cast< uintptr_t >(rreq.get())); | ||
return rreq; | ||
} | ||
|
||
|
@@ -400,7 +399,7 @@ folly::Future< folly::Unit > RaftReplDev::notify_after_data_written(std::vector< | |
if (!rreq->has_linked_data()) { continue; } | ||
auto const status = uint32_cast(rreq->state()); | ||
if (status & uint32_cast(repl_req_state_t::DATA_WRITTEN)) { | ||
RD_LOGD("Raft Channel: Data write completed and blkid mapped: rreq=[{}]", rreq->to_compact_string()); | ||
RD_LOGD("Raft Channel: Data write completed and blkid mapped: rreq=[{}]", rreq->to_string()); | ||
continue; | ||
} | ||
|
||
|
@@ -443,7 +442,7 @@ folly::Future< folly::Unit > RaftReplDev::notify_after_data_written(std::vector< | |
HS_DBG_ASSERT(rreq->has_state(repl_req_state_t::DATA_WRITTEN), | ||
"Data written promise raised without updating DATA_WRITTEN state for rkey={}", | ||
rreq->rkey().to_string()); | ||
RD_LOGD("Raft Channel: Data write completed and blkid mapped: rreq=[{}]", rreq->to_compact_string()); | ||
RD_LOGD("Raft Channel: Data write completed and blkid mapped: rreq=[{}]", rreq->to_string()); | ||
} | ||
#endif | ||
RD_LOGT("Data Channel: {} pending reqs's data are written", rreqs->size()); | ||
|
@@ -498,11 +497,11 @@ void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t > rreq | |
auto const cur_state = uint32_cast(rreq->state()); | ||
if (cur_state == uint32_cast(repl_req_state_t::ERRORED)) { | ||
// We already received the data before, just ignore this data | ||
RD_LOGD("Raft Channel: rreq=[{}] already errored out, ignoring the fetch", rreq->to_compact_string()); | ||
RD_LOGD("Raft Channel: rreq=[{}] already errored out, ignoring the fetch", rreq->to_string()); | ||
continue; | ||
} else if (cur_state == uint32_cast(repl_req_state_t::DATA_RECEIVED)) { | ||
// We already received the data before, just ignore this data | ||
RD_LOGD("Raft Channel: Data already received for rreq=[{}], ignoring the fetch", rreq->to_compact_string()); | ||
RD_LOGD("Raft Channel: Data already received for rreq=[{}], ignoring the fetch", rreq->to_string()); | ||
continue; | ||
} | ||
|
||
|
@@ -526,7 +525,7 @@ void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t > rreq | |
void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) { | ||
if (rreqs.size() == 0) { return; } | ||
|
||
std::vector< ::flatbuffers::Offset< RequestEntry > > entries; | ||
std::vector<::flatbuffers::Offset< RequestEntry > > entries; | ||
entries.reserve(rreqs.size()); | ||
|
||
shared< flatbuffers::FlatBufferBuilder > builder = std::make_shared< flatbuffers::FlatBufferBuilder >(); | ||
|
@@ -544,10 +543,10 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) { | |
// but if there were to be such case, we need to group rreqs by originator and send them in separate | ||
// batches; | ||
RD_DBG_ASSERT_EQ(rreq->remote_blkid().server_id, originator, "Unexpected originator for rreq={}", | ||
rreq->to_compact_string()); | ||
rreq->to_string()); | ||
|
||
RD_LOGT("Fetching data from originator={}, remote: rreq=[{}], remote_blkid={}, my server_id={}", originator, | ||
rreq->to_compact_string(), rreq->remote_blkid().blkid.to_string(), server_id()); | ||
rreq->to_string(), rreq->remote_blkid().blkid.to_string(), server_id()); | ||
} | ||
|
||
builder->FinishSizePrefixed( | ||
|
@@ -571,7 +570,7 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) { | |
auto const fetch_latency_us = get_elapsed_time_us(fetch_start_time); | ||
HISTOGRAM_OBSERVE(m_metrics, rreq_data_fetch_latency_us, fetch_latency_us); | ||
|
||
RD_LOGD("Data Channel: FetchData from remote completed, time taken={} ms", fetch_latency_us); | ||
RD_LOGD("Data Channel: FetchData from remote completed, time taken={} us", fetch_latency_us); | ||
|
||
if (!response) { | ||
// if we are here, it means the original who sent the log entries are down. | ||
|
@@ -703,13 +702,13 @@ void RaftReplDev::handle_fetch_data_response(sisl::GenericClientResponse respons | |
auto const data_size = rreq->remote_blkid().blkid.blk_count() * get_blk_size(); | ||
|
||
if (!rreq->save_fetched_data(response, raw_data, data_size)) { | ||
RD_DBG_ASSERT(rreq->local_blkid().is_valid(), "Invalid blkid for rreq={}", rreq->to_compact_string()); | ||
RD_DBG_ASSERT(rreq->local_blkid().is_valid(), "Invalid blkid for rreq={}", rreq->to_string()); | ||
auto const local_size = rreq->local_blkid().blk_count() * get_blk_size(); | ||
RD_DBG_ASSERT_EQ(data_size, local_size, "Data size mismatch for rreq={} remote size: {}, local size: {}", | ||
rreq->to_compact_string(), data_size, local_size); | ||
rreq->to_string(), data_size, local_size); | ||
|
||
RD_LOGD("Data Channel: Data already received for rreq=[{}], skip and move on to next rreq.", | ||
rreq->to_compact_string()); | ||
rreq->to_string()); | ||
} else { | ||
auto const data_write_start_time = Clock::now(); | ||
COUNTER_INCREMENT(m_metrics, total_write_cnt, 1); | ||
|
@@ -734,11 +733,11 @@ void RaftReplDev::handle_fetch_data_response(sisl::GenericClientResponse respons | |
|
||
RD_LOGD("Data Channel: Data Write completed rreq=[{}], data_write_latency_us={}, " | ||
"total_write_latency_us={}, write_num_pieces={}", | ||
rreq->to_compact_string(), data_write_latency, total_data_write_latency, write_num_pieces); | ||
rreq->to_string(), data_write_latency, total_data_write_latency, write_num_pieces); | ||
}); | ||
|
||
RD_LOGD("Data Channel: Data fetched from remote: rreq=[{}], data_size: {}, total_size: {}, local_blkid: {}", | ||
rreq->to_compact_string(), data_size, total_size, rreq->local_blkid().to_string()); | ||
rreq->to_string(), data_size, total_size, rreq->local_blkid().to_string()); | ||
} | ||
raw_data += data_size; | ||
total_size -= data_size; | ||
|
@@ -796,7 +795,7 @@ void RaftReplDev::handle_error(repl_req_ptr_t const& rreq, ReplServiceError err) | |
|
||
if (rreq->op_code() == journal_type_t::HS_DATA_INLINED) { | ||
// Free the blks which is allocated already | ||
RD_LOGE("Raft Channel: Error in processing rreq=[{}] error={}", rreq->to_compact_string(), err); | ||
RD_LOGE("Raft Channel: Error in processing rreq=[{}] error={}", rreq->to_string(), err); | ||
if (rreq->has_state(repl_req_state_t::BLK_ALLOCATED)) { | ||
auto blkid = rreq->local_blkid(); | ||
data_service().async_free_blk(blkid).thenValue([blkid](auto&& err) { | ||
|
@@ -1031,6 +1030,12 @@ std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nu | |
auto raft_req = r_cast< nuraft::req_msg* >(param->ctx); | ||
auto const& entries = raft_req->log_entries(); | ||
|
||
auto start_lsn = raft_req->get_last_log_idx() + 1; | ||
RD_LOGD("Raft channel: Received {} append entries on follower from leader, term {}, lsn {} ~ {} , my commited " | ||
"lsn {} , leader commmited lsn {}", | ||
entries.size(), raft_req->get_last_log_term(), start_lsn, start_lsn + entries.size() - 1, | ||
m_commit_upto_lsn.load(), raft_req->get_commit_idx()); | ||
|
||
if (!entries.empty()) { | ||
RD_LOGT("Raft channel: Received {} append entries on follower from leader, localizing them", | ||
entries.size()); | ||
|
@@ -1166,12 +1171,17 @@ void RaftReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx | |
RD_DBG_ASSERT((it != m_repl_key_req_map.end()), "Unexpected error in map_repl_key_to_req"); | ||
auto rreq = it->second; | ||
RD_DBG_ASSERT(happened, "rreq already exists for rkey={}", rkey.to_string()); | ||
MultiBlkId entry_blkid; | ||
entry_blkid.deserialize(entry_to_val(jentry), true /* copy */); | ||
rreq->init(rkey, jentry->code, false /* is_proposer */, entry_to_hdr(jentry), entry_to_key(jentry), | ||
(entry_blkid.blk_count() * get_blk_size())); | ||
rreq->set_local_blkid(entry_blkid); | ||
uint32_t data_size{0u}; | ||
|
||
if ((jentry->code == journal_type_t::HS_DATA_LINKED) && (jentry->value_size > 0)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should assert if data linked, value_size should > 0 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it is already asserted in repl_contxt_t::init(), no? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, we assert in the init |
||
MultiBlkId entry_blkid; | ||
entry_blkid.deserialize(entry_to_val(jentry), true /* copy */); | ||
data_size = entry_blkid.blk_count() * get_blk_size(); | ||
rreq->set_local_blkid(entry_blkid); | ||
} | ||
|
||
rreq->set_lsn(repl_lsn); | ||
rreq->init(rkey, jentry->code, false /* is_proposer */, entry_to_hdr(jentry), entry_to_key(jentry), data_size); | ||
RD_LOGD("Replay log on restart, rreq=[{}]", rreq->to_string()); | ||
|
||
if (repl_lsn > m_rd_sb->durable_commit_lsn) { | ||
|
@@ -1188,4 +1198,11 @@ void RaftReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx | |
|
||
void RaftReplDev::on_restart() { m_listener->on_restart(); } | ||
|
||
bool RaftReplDev::is_resync_mode() { | ||
int64_t const leader_commited_lsn = raft_server()->get_leader_committed_log_idx(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it should be fine. |
||
int64_t const my_log_idx = raft_server()->get_last_log_idx(); | ||
auto diff = leader_commited_lsn - my_log_idx; | ||
return diff > HS_DYNAMIC_CONFIG(consensus.resync_log_idx_threshold); | ||
} | ||
|
||
} // namespace homestore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we might want to tune this number later with E2E env, I am fine with this number for now.