Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inlined_data #520

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.4.50"
version = "6.4.51"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
3 changes: 3 additions & 0 deletions src/lib/common/homestore_config.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,9 @@ table Consensus {

// Frequency to flush durable commit LSN in millis
flush_durable_commit_interval_ms: uint64 = 500;

// Log difference to determine if the follower is in resync mode
resync_log_idx_threshold: int64 = 100;
}

table HomeStoreSettings {
Expand Down
59 changes: 38 additions & 21 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_d
}

if (!rreq->save_pushed_data(rpc_data, incoming_buf.cbytes() + fb_size, push_req->data_size())) {
RD_LOGD("Data Channel: Data already received for rreq=[{}], ignoring this data", rreq->to_compact_string());
RD_LOGD("Data Channel: Data already received for rreq=[{}], ignoring this data", rreq->to_string());
return;
}

Expand Down Expand Up @@ -328,7 +328,7 @@ void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_d
RD_LOGD("Data Channel: Data write completed for rreq=[{}], time_diff_data_log_us={}, "
"data_write_latency_us={}, total_data_write_latency_us(rreq creation to write complete)={}, "
"local_blkid.num_pieces={}",
rreq->to_compact_string(), data_log_diff_us, data_write_latency, total_data_write_latency,
rreq->to_string(), data_log_diff_us, data_write_latency, total_data_write_latency,
write_num_pieces);
}
});
Expand Down Expand Up @@ -385,7 +385,7 @@ repl_req_ptr_t RaftReplDev::applier_create_req(repl_key const& rkey, journal_typ
return nullptr;
}

RD_LOGD("in follower_create_req: rreq={}, addr={}", rreq->to_compact_string(),
RD_LOGD("in follower_create_req: rreq={}, addr={}", rreq->to_string(),
reinterpret_cast< uintptr_t >(rreq.get()));
return rreq;
}
Expand All @@ -400,7 +400,7 @@ folly::Future< folly::Unit > RaftReplDev::notify_after_data_written(std::vector<
if (!rreq->has_linked_data()) { continue; }
auto const status = uint32_cast(rreq->state());
if (status & uint32_cast(repl_req_state_t::DATA_WRITTEN)) {
RD_LOGD("Raft Channel: Data write completed and blkid mapped: rreq=[{}]", rreq->to_compact_string());
RD_LOGD("Raft Channel: Data write completed and blkid mapped: rreq=[{}]", rreq->to_string());
continue;
}

Expand Down Expand Up @@ -443,7 +443,7 @@ folly::Future< folly::Unit > RaftReplDev::notify_after_data_written(std::vector<
HS_DBG_ASSERT(rreq->has_state(repl_req_state_t::DATA_WRITTEN),
"Data written promise raised without updating DATA_WRITTEN state for rkey={}",
rreq->rkey().to_string());
RD_LOGD("Raft Channel: Data write completed and blkid mapped: rreq=[{}]", rreq->to_compact_string());
RD_LOGD("Raft Channel: Data write completed and blkid mapped: rreq=[{}]", rreq->to_string());
}
#endif
RD_LOGT("Data Channel: {} pending reqs's data are written", rreqs->size());
Expand Down Expand Up @@ -498,11 +498,11 @@ void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t > rreq
auto const cur_state = uint32_cast(rreq->state());
if (cur_state == uint32_cast(repl_req_state_t::ERRORED)) {
// We already received the data before, just ignore this data
RD_LOGD("Raft Channel: rreq=[{}] already errored out, ignoring the fetch", rreq->to_compact_string());
RD_LOGD("Raft Channel: rreq=[{}] already errored out, ignoring the fetch", rreq->to_string());
continue;
} else if (cur_state == uint32_cast(repl_req_state_t::DATA_RECEIVED)) {
// We already received the data before, just ignore this data
RD_LOGD("Raft Channel: Data already received for rreq=[{}], ignoring the fetch", rreq->to_compact_string());
RD_LOGD("Raft Channel: Data already received for rreq=[{}], ignoring the fetch", rreq->to_string());
continue;
}

Expand Down Expand Up @@ -544,10 +544,10 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) {
// but if there were to be such case, we need to group rreqs by originator and send them in separate
// batches;
RD_DBG_ASSERT_EQ(rreq->remote_blkid().server_id, originator, "Unexpected originator for rreq={}",
rreq->to_compact_string());
rreq->to_string());

RD_LOGT("Fetching data from originator={}, remote: rreq=[{}], remote_blkid={}, my server_id={}", originator,
rreq->to_compact_string(), rreq->remote_blkid().blkid.to_string(), server_id());
rreq->to_string(), rreq->remote_blkid().blkid.to_string(), server_id());
}

builder->FinishSizePrefixed(
Expand All @@ -571,7 +571,7 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) {
auto const fetch_latency_us = get_elapsed_time_us(fetch_start_time);
HISTOGRAM_OBSERVE(m_metrics, rreq_data_fetch_latency_us, fetch_latency_us);

RD_LOGD("Data Channel: FetchData from remote completed, time taken={} ms", fetch_latency_us);
RD_LOGD("Data Channel: FetchData from remote completed, time taken={} us", fetch_latency_us);

if (!response) {
// if we are here, it means the original who sent the log entries are down.
Expand Down Expand Up @@ -703,13 +703,13 @@ void RaftReplDev::handle_fetch_data_response(sisl::GenericClientResponse respons
auto const data_size = rreq->remote_blkid().blkid.blk_count() * get_blk_size();

if (!rreq->save_fetched_data(response, raw_data, data_size)) {
RD_DBG_ASSERT(rreq->local_blkid().is_valid(), "Invalid blkid for rreq={}", rreq->to_compact_string());
RD_DBG_ASSERT(rreq->local_blkid().is_valid(), "Invalid blkid for rreq={}", rreq->to_string());
auto const local_size = rreq->local_blkid().blk_count() * get_blk_size();
RD_DBG_ASSERT_EQ(data_size, local_size, "Data size mismatch for rreq={} remote size: {}, local size: {}",
rreq->to_compact_string(), data_size, local_size);
rreq->to_string(), data_size, local_size);

RD_LOGD("Data Channel: Data already received for rreq=[{}], skip and move on to next rreq.",
rreq->to_compact_string());
rreq->to_string());
} else {
auto const data_write_start_time = Clock::now();
COUNTER_INCREMENT(m_metrics, total_write_cnt, 1);
Expand All @@ -734,11 +734,11 @@ void RaftReplDev::handle_fetch_data_response(sisl::GenericClientResponse respons

RD_LOGD("Data Channel: Data Write completed rreq=[{}], data_write_latency_us={}, "
"total_write_latency_us={}, write_num_pieces={}",
rreq->to_compact_string(), data_write_latency, total_data_write_latency, write_num_pieces);
rreq->to_string(), data_write_latency, total_data_write_latency, write_num_pieces);
});

RD_LOGD("Data Channel: Data fetched from remote: rreq=[{}], data_size: {}, total_size: {}, local_blkid: {}",
rreq->to_compact_string(), data_size, total_size, rreq->local_blkid().to_string());
rreq->to_string(), data_size, total_size, rreq->local_blkid().to_string());
}
raw_data += data_size;
total_size -= data_size;
Expand Down Expand Up @@ -796,7 +796,7 @@ void RaftReplDev::handle_error(repl_req_ptr_t const& rreq, ReplServiceError err)

if (rreq->op_code() == journal_type_t::HS_DATA_INLINED) {
// Free the blks which is allocated already
RD_LOGE("Raft Channel: Error in processing rreq=[{}] error={}", rreq->to_compact_string(), err);
RD_LOGE("Raft Channel: Error in processing rreq=[{}] error={}", rreq->to_string(), err);
if (rreq->has_state(repl_req_state_t::BLK_ALLOCATED)) {
auto blkid = rreq->local_blkid();
data_service().async_free_blk(blkid).thenValue([blkid](auto&& err) {
Expand Down Expand Up @@ -1031,6 +1031,12 @@ std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nu
auto raft_req = r_cast< nuraft::req_msg* >(param->ctx);
auto const& entries = raft_req->log_entries();

auto start_lsn = raft_req->get_last_log_idx() + 1;
RD_LOGD("Raft channel: Received {} append entries on follower from leader, term {}, lsn {} ~ {} , my commited "
"lsn {} , leader commmited lsn {}",
entries.size(), raft_req->get_last_log_term(), start_lsn, start_lsn + entries.size() - 1,
m_commit_upto_lsn.load(), raft_req->get_commit_idx());

if (!entries.empty()) {
RD_LOGT("Raft channel: Received {} append entries on follower from leader, localizing them",
entries.size());
Expand Down Expand Up @@ -1166,11 +1172,15 @@ void RaftReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx
RD_DBG_ASSERT((it != m_repl_key_req_map.end()), "Unexpected error in map_repl_key_to_req");
auto rreq = it->second;
RD_DBG_ASSERT(happened, "rreq already exists for rkey={}", rkey.to_string());
MultiBlkId entry_blkid;
entry_blkid.deserialize(entry_to_val(jentry), true /* copy */);
rreq->init(rkey, jentry->code, false /* is_proposer */, entry_to_hdr(jentry), entry_to_key(jentry),
(entry_blkid.blk_count() * get_blk_size()));
rreq->set_local_blkid(entry_blkid);

if ((jentry->code == journal_type_t::HS_DATA_LINKED) && (jentry->value_size > 0)) {
MultiBlkId entry_blkid;
entry_blkid.deserialize(entry_to_val(jentry), true /* copy */);
rreq->init(rkey, jentry->code, false /* is_proposer */, entry_to_hdr(jentry), entry_to_key(jentry),
(entry_blkid.blk_count() * get_blk_size()));
rreq->set_local_blkid(entry_blkid);
}

rreq->set_lsn(repl_lsn);
RD_LOGD("Replay log on restart, rreq=[{}]", rreq->to_string());

Expand All @@ -1188,4 +1198,11 @@ void RaftReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx

void RaftReplDev::on_restart() { m_listener->on_restart(); }

bool RaftReplDev::is_resync_mode() {
int64_t const leader_commited_lsn = raft_server()->get_leader_committed_log_idx();
int64_t const my_log_idx = raft_server()->get_last_log_idx();
auto diff = leader_commited_lsn - my_log_idx;
return diff > HS_DYNAMIC_CONFIG(consensus.resync_log_idx_threshold);
}

} // namespace homestore
3 changes: 1 addition & 2 deletions src/lib/replication/repl_dev/raft_repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ class RaftReplDev : public ReplDev,

iomgr::timer_handle_t m_wait_data_timer_hdl{
iomgr::null_timer_handle}; // non-recurring timer doesn't need to be cancelled on shutdown;
bool m_resync_mode{false};
Clock::time_point m_destroyed_time;
folly::Promise< ReplServiceError > m_destroy_promise;
RaftReplDevMetrics m_metrics;
Expand Down Expand Up @@ -255,7 +254,7 @@ class RaftReplDev : public ReplDev,
void on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_data);
void fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs);
void handle_fetch_data_response(sisl::GenericClientResponse response, std::vector< repl_req_ptr_t > rreqs);
bool is_resync_mode() { return m_resync_mode; }
bool is_resync_mode();
void handle_error(repl_req_ptr_t const& rreq, ReplServiceError err);
bool wait_for_data_receive(std::vector< repl_req_ptr_t > const& rreqs, uint64_t timeout_ms);
void on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx);
Expand Down
Loading