diff --git a/conanfile.py b/conanfile.py index c31e269b7..30d53f709 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "6.4.50" + version = "6.4.51" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/lib/common/homestore_config.fbs b/src/lib/common/homestore_config.fbs index b17927d26..24aae9f20 100644 --- a/src/lib/common/homestore_config.fbs +++ b/src/lib/common/homestore_config.fbs @@ -261,6 +261,9 @@ table Consensus { // Frequency to flush durable commit LSN in millis flush_durable_commit_interval_ms: uint64 = 500; + + // Log difference to determine if the follower is in resync mode + resync_log_idx_threshold: int64 = 100; } table HomeStoreSettings { diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index dd20354d7..bad06c16b 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -291,7 +291,7 @@ void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_d } if (!rreq->save_pushed_data(rpc_data, incoming_buf.cbytes() + fb_size, push_req->data_size())) { - RD_LOGD("Data Channel: Data already received for rreq=[{}], ignoring this data", rreq->to_compact_string()); + RD_LOGD("Data Channel: Data already received for rreq=[{}], ignoring this data", rreq->to_string()); return; } @@ -328,7 +328,7 @@ void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_d RD_LOGD("Data Channel: Data write completed for rreq=[{}], time_diff_data_log_us={}, " "data_write_latency_us={}, total_data_write_latency_us(rreq creation to write complete)={}, " "local_blkid.num_pieces={}", - rreq->to_compact_string(), data_log_diff_us, data_write_latency, total_data_write_latency, + rreq->to_string(), data_log_diff_us, data_write_latency, total_data_write_latency, write_num_pieces); } }); @@ -385,8 +385,7 @@ repl_req_ptr_t RaftReplDev::applier_create_req(repl_key const& rkey, journal_typ return nullptr; } - RD_LOGD("in follower_create_req: rreq={}, addr={}", rreq->to_compact_string(), - reinterpret_cast< uintptr_t >(rreq.get())); + RD_LOGD("in follower_create_req: rreq={}, addr={}", rreq->to_string(), reinterpret_cast< uintptr_t >(rreq.get())); return rreq; } @@ -400,7 +399,7 @@ folly::Future< folly::Unit > RaftReplDev::notify_after_data_written(std::vector< if (!rreq->has_linked_data()) { continue; } auto const status = uint32_cast(rreq->state()); if (status & uint32_cast(repl_req_state_t::DATA_WRITTEN)) { - RD_LOGD("Raft Channel: Data write completed and blkid mapped: rreq=[{}]", rreq->to_compact_string()); + RD_LOGD("Raft Channel: Data write completed and blkid mapped: rreq=[{}]", rreq->to_string()); continue; } @@ -443,7 +442,7 @@ folly::Future< folly::Unit > RaftReplDev::notify_after_data_written(std::vector< HS_DBG_ASSERT(rreq->has_state(repl_req_state_t::DATA_WRITTEN), "Data written promise raised without updating DATA_WRITTEN state for rkey={}", rreq->rkey().to_string()); - RD_LOGD("Raft Channel: Data write completed and blkid mapped: rreq=[{}]", rreq->to_compact_string()); + RD_LOGD("Raft Channel: Data write completed and blkid mapped: rreq=[{}]", rreq->to_string()); } #endif RD_LOGT("Data Channel: {} pending reqs's data are written", rreqs->size()); @@ -498,11 +497,11 @@ void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t > rreq auto const cur_state = uint32_cast(rreq->state()); if (cur_state == uint32_cast(repl_req_state_t::ERRORED)) { // We already received the data before, just ignore this data - RD_LOGD("Raft Channel: rreq=[{}] already errored out, ignoring the fetch", rreq->to_compact_string()); + RD_LOGD("Raft Channel: rreq=[{}] already errored out, ignoring the fetch", rreq->to_string()); continue; } else if (cur_state == uint32_cast(repl_req_state_t::DATA_RECEIVED)) { // We already received the data before, just ignore this data - RD_LOGD("Raft Channel: Data already received for rreq=[{}], ignoring the fetch", rreq->to_compact_string()); + RD_LOGD("Raft Channel: Data already received for rreq=[{}], ignoring the fetch", rreq->to_string()); continue; } @@ -526,7 +525,7 @@ void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t > rreq void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) { if (rreqs.size() == 0) { return; } - std::vector< ::flatbuffers::Offset< RequestEntry > > entries; + std::vector<::flatbuffers::Offset< RequestEntry > > entries; entries.reserve(rreqs.size()); shared< flatbuffers::FlatBufferBuilder > builder = std::make_shared< flatbuffers::FlatBufferBuilder >(); @@ -544,10 +543,10 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) { // but if there were to be such case, we need to group rreqs by originator and send them in separate // batches; RD_DBG_ASSERT_EQ(rreq->remote_blkid().server_id, originator, "Unexpected originator for rreq={}", - rreq->to_compact_string()); + rreq->to_string()); RD_LOGT("Fetching data from originator={}, remote: rreq=[{}], remote_blkid={}, my server_id={}", originator, - rreq->to_compact_string(), rreq->remote_blkid().blkid.to_string(), server_id()); + rreq->to_string(), rreq->remote_blkid().blkid.to_string(), server_id()); } builder->FinishSizePrefixed( @@ -571,7 +570,7 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) { auto const fetch_latency_us = get_elapsed_time_us(fetch_start_time); HISTOGRAM_OBSERVE(m_metrics, rreq_data_fetch_latency_us, fetch_latency_us); - RD_LOGD("Data Channel: FetchData from remote completed, time taken={} ms", fetch_latency_us); + RD_LOGD("Data Channel: FetchData from remote completed, time taken={} us", fetch_latency_us); if (!response) { // if we are here, it means the original who sent the log entries are down. @@ -703,13 +702,13 @@ void RaftReplDev::handle_fetch_data_response(sisl::GenericClientResponse respons auto const data_size = rreq->remote_blkid().blkid.blk_count() * get_blk_size(); if (!rreq->save_fetched_data(response, raw_data, data_size)) { - RD_DBG_ASSERT(rreq->local_blkid().is_valid(), "Invalid blkid for rreq={}", rreq->to_compact_string()); + RD_DBG_ASSERT(rreq->local_blkid().is_valid(), "Invalid blkid for rreq={}", rreq->to_string()); auto const local_size = rreq->local_blkid().blk_count() * get_blk_size(); RD_DBG_ASSERT_EQ(data_size, local_size, "Data size mismatch for rreq={} remote size: {}, local size: {}", - rreq->to_compact_string(), data_size, local_size); + rreq->to_string(), data_size, local_size); RD_LOGD("Data Channel: Data already received for rreq=[{}], skip and move on to next rreq.", - rreq->to_compact_string()); + rreq->to_string()); } else { auto const data_write_start_time = Clock::now(); COUNTER_INCREMENT(m_metrics, total_write_cnt, 1); @@ -734,11 +733,11 @@ void RaftReplDev::handle_fetch_data_response(sisl::GenericClientResponse respons RD_LOGD("Data Channel: Data Write completed rreq=[{}], data_write_latency_us={}, " "total_write_latency_us={}, write_num_pieces={}", - rreq->to_compact_string(), data_write_latency, total_data_write_latency, write_num_pieces); + rreq->to_string(), data_write_latency, total_data_write_latency, write_num_pieces); }); RD_LOGD("Data Channel: Data fetched from remote: rreq=[{}], data_size: {}, total_size: {}, local_blkid: {}", - rreq->to_compact_string(), data_size, total_size, rreq->local_blkid().to_string()); + rreq->to_string(), data_size, total_size, rreq->local_blkid().to_string()); } raw_data += data_size; total_size -= data_size; @@ -796,7 +795,7 @@ void RaftReplDev::handle_error(repl_req_ptr_t const& rreq, ReplServiceError err) if (rreq->op_code() == journal_type_t::HS_DATA_INLINED) { // Free the blks which is allocated already - RD_LOGE("Raft Channel: Error in processing rreq=[{}] error={}", rreq->to_compact_string(), err); + RD_LOGE("Raft Channel: Error in processing rreq=[{}] error={}", rreq->to_string(), err); if (rreq->has_state(repl_req_state_t::BLK_ALLOCATED)) { auto blkid = rreq->local_blkid(); data_service().async_free_blk(blkid).thenValue([blkid](auto&& err) { @@ -1031,6 +1030,12 @@ std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nu auto raft_req = r_cast< nuraft::req_msg* >(param->ctx); auto const& entries = raft_req->log_entries(); + auto start_lsn = raft_req->get_last_log_idx() + 1; + RD_LOGD("Raft channel: Received {} append entries on follower from leader, term {}, lsn {} ~ {} , my commited " + "lsn {} , leader commmited lsn {}", + entries.size(), raft_req->get_last_log_term(), start_lsn, start_lsn + entries.size() - 1, + m_commit_upto_lsn.load(), raft_req->get_commit_idx()); + if (!entries.empty()) { RD_LOGT("Raft channel: Received {} append entries on follower from leader, localizing them", entries.size()); @@ -1166,12 +1171,17 @@ void RaftReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx RD_DBG_ASSERT((it != m_repl_key_req_map.end()), "Unexpected error in map_repl_key_to_req"); auto rreq = it->second; RD_DBG_ASSERT(happened, "rreq already exists for rkey={}", rkey.to_string()); - MultiBlkId entry_blkid; - entry_blkid.deserialize(entry_to_val(jentry), true /* copy */); - rreq->init(rkey, jentry->code, false /* is_proposer */, entry_to_hdr(jentry), entry_to_key(jentry), - (entry_blkid.blk_count() * get_blk_size())); - rreq->set_local_blkid(entry_blkid); + uint32_t data_size{0u}; + + if ((jentry->code == journal_type_t::HS_DATA_LINKED) && (jentry->value_size > 0)) { + MultiBlkId entry_blkid; + entry_blkid.deserialize(entry_to_val(jentry), true /* copy */); + data_size = entry_blkid.blk_count() * get_blk_size(); + rreq->set_local_blkid(entry_blkid); + } + rreq->set_lsn(repl_lsn); + rreq->init(rkey, jentry->code, false /* is_proposer */, entry_to_hdr(jentry), entry_to_key(jentry), data_size); RD_LOGD("Replay log on restart, rreq=[{}]", rreq->to_string()); if (repl_lsn > m_rd_sb->durable_commit_lsn) { @@ -1188,4 +1198,11 @@ void RaftReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx void RaftReplDev::on_restart() { m_listener->on_restart(); } +bool RaftReplDev::is_resync_mode() { + int64_t const leader_commited_lsn = raft_server()->get_leader_committed_log_idx(); + int64_t const my_log_idx = raft_server()->get_last_log_idx(); + auto diff = leader_commited_lsn - my_log_idx; + return diff > HS_DYNAMIC_CONFIG(consensus.resync_log_idx_threshold); +} + } // namespace homestore diff --git a/src/lib/replication/repl_dev/raft_repl_dev.h b/src/lib/replication/repl_dev/raft_repl_dev.h index 886692a16..a68e3f578 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.h +++ b/src/lib/replication/repl_dev/raft_repl_dev.h @@ -135,7 +135,6 @@ class RaftReplDev : public ReplDev, iomgr::timer_handle_t m_wait_data_timer_hdl{ iomgr::null_timer_handle}; // non-recurring timer doesn't need to be cancelled on shutdown; - bool m_resync_mode{false}; Clock::time_point m_destroyed_time; folly::Promise< ReplServiceError > m_destroy_promise; RaftReplDevMetrics m_metrics; @@ -255,7 +254,7 @@ class RaftReplDev : public ReplDev, void on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_data); void fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs); void handle_fetch_data_response(sisl::GenericClientResponse response, std::vector< repl_req_ptr_t > rreqs); - bool is_resync_mode() { return m_resync_mode; } + bool is_resync_mode(); void handle_error(repl_req_ptr_t const& rreq, ReplServiceError err); bool wait_for_data_receive(std::vector< repl_req_ptr_t > const& rreqs, uint64_t timeout_ms); void on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx);