Skip to content

Commit

Permalink
add lsn check before append entry
Browse files Browse the repository at this point in the history
  • Loading branch information
JacksonYao287 committed Aug 30, 2024
1 parent fc1e7a2 commit e869041
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 14 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.4.54"
version = "6.4.55"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
6 changes: 4 additions & 2 deletions src/lib/replication/log_store/home_raft_log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,12 @@ raft_buf_ptr_t HomeRaftLogStore::pack(ulong index, int32_t cnt) {
m_log_store->foreach (
to_store_lsn(index),
[this, &out_buf, &remain_cnt]([[maybe_unused]] store_lsn_t cur, const log_buffer& entry) mutable -> bool {
size_t const total_entry_size = entry.size() + sizeof(uint32_t);
if (remain_cnt-- > 0) {
size_t avail_size = out_buf->size() - out_buf->pos();
if (avail_size < entry.size() + sizeof(uint32_t)) {
avail_size += std::max(out_buf->size() * 2, (size_t)entry.size() + sizeof(uint32_t));
// available size of packing buffer should be able to hold entry.size() and the length of this entry
if (avail_size < total_entry_size) {
avail_size += std::max(out_buf->size() * 2, total_entry_size);
out_buf = nuraft::buffer::expand(*out_buf, avail_size);
}
REPL_STORE_LOG(TRACE, "packing lsn={} of size={}, avail_size in buffer={}", to_repl_lsn(cur),
Expand Down
18 changes: 10 additions & 8 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t > rreq
void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) {
if (rreqs.size() == 0) { return; }

std::vector<::flatbuffers::Offset< RequestEntry > > entries;
std::vector< ::flatbuffers::Offset< RequestEntry > > entries;
entries.reserve(rreqs.size());

shared< flatbuffers::FlatBufferBuilder > builder = std::make_shared< flatbuffers::FlatBufferBuilder >();
Expand Down Expand Up @@ -1041,6 +1041,14 @@ std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nu
entries.size(), raft_req->get_last_log_term(), start_lsn, start_lsn + entries.size() - 1,
m_commit_upto_lsn.load(), raft_req->get_commit_idx());

if (start_lsn != m_data_journal->next_slot()) {
// if the start_lsn of this batch does not match the next_slot, drop this log batch
// this will happen when the leader is sending the logs which are already received and appended
RD_LOGD("start_lsn={} does not match the next_slot={}, dropping the log batch", start_lsn,
m_data_journal->next_slot());
return {true, nuraft::cb_func::ReturnCode::ReturnNull};
}

if (!entries.empty()) {
RD_LOGT("Raft channel: Received {} append entries on follower from leader, localizing them",
entries.size());
Expand All @@ -1050,13 +1058,7 @@ std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nu
if (entry->get_val_type() != nuraft::log_val_type::app_log) { continue; }
if (entry->get_buf_ptr()->size() == 0) { continue; }
auto req = m_state_machine->localize_journal_entry_prepare(*entry);
// TODO :: we need to indentify whether this log entry should be appended to log store.
// 1 for lsn, if the req#lsn is not -1, it means this log has been localized and apeneded before, we
// should skip it.
// 2 for dsn, if the req#dsn is less than the next_dsn, it means this log has been
// committed, we should skip it.
// here, we only check the first condition for now. revisit here if we need to check the second
if (req == nullptr || req->lsn() != -1) {
if (req == nullptr) {
sisl::VectorPool< repl_req_ptr_t >::free(reqs);
return {true, nuraft::cb_func::ReturnCode::ReturnNull};
}
Expand Down
2 changes: 1 addition & 1 deletion src/lib/replication/repl_dev/raft_repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ class RaftReplDev : public ReplDev,
repl_lsn_t m_last_flushed_commit_lsn{0}; // LSN upto which it was flushed to persistent store
iomgr::timer_handle_t m_sb_flush_timer_hdl;

std::atomic< uint64_t > m_next_dsn{0}; // Data Sequence Number that will keep incrementing for each data entry
std::atomic< uint64_t > m_next_dsn{1}; // Data Sequence Number that will keep incrementing for each data entry

iomgr::timer_handle_t m_wait_data_timer_hdl{
iomgr::null_timer_handle}; // non-recurring timer doesn't need to be cancelled on shutdown;
Expand Down
5 changes: 4 additions & 1 deletion src/lib/replication/repl_dev/raft_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,10 @@ void RaftStateMachine::become_ready() { m_rd.become_ready(); }

void RaftStateMachine::unlink_lsn_to_req(int64_t lsn) {
auto const it = m_lsn_req_map.find(lsn);
if (it != m_lsn_req_map.cend()) { m_lsn_req_map.erase(lsn); }
if (it != m_lsn_req_map.cend()) {
RD_LOG(DEBUG, "Raft channel: erase lsn {}, rreq {}", lsn, it->second->to_string());
m_lsn_req_map.erase(lsn);
}
}

void RaftStateMachine::link_lsn_to_req(repl_req_ptr_t rreq, int64_t lsn) {
Expand Down
2 changes: 1 addition & 1 deletion src/tests/test_raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ class RaftReplDevTest : public testing::Test {
// destroyed for ever. we need handle this in raft_repl_dev. revisit here after making changes at
// raft_repl_dev side to hanle this case. this is a workaround to avoid the infinite loop for now.
if (i++ > 10 && !force_leave) {
LOGWARN("Waiting for repl dev to get destroyed and it is leader, so do a force leave");
LOGWARN("has already waited for repl dev to get destroyed for 10 times, so do a force leave");
repl_dev->force_leave();
force_leave = true;
}
Expand Down

0 comments on commit e869041

Please sign in to comment.