Skip to content

Commit

Permalink
Merge pull request #524 from raakella1/commit_blk
Browse files Browse the repository at this point in the history
commit blk during log replay in rep dev
  • Loading branch information
raakella1 authored Aug 28, 2024
2 parents 5b26248 + 7652028 commit 8d6d70b
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 7 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.4.52"
version = "6.4.53"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
4 changes: 3 additions & 1 deletion src/include/homestore/replication/repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ template < typename T >
using ptr = std::shared_ptr< T >;

class buffer;
class log_entry;
} // namespace nuraft

namespace homestore {
Expand Down Expand Up @@ -149,7 +150,6 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost::

raft_buf_ptr_t& raft_journal_buf();
uint8_t* raw_journal_buf();

/////////////////////// Non modifiers methods //////////////////
std::string to_string() const;
std::string to_compact_string() const;
Expand Down Expand Up @@ -203,6 +203,7 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost::
void set_lsn(int64_t lsn);
void add_state(repl_req_state_t s);
bool add_state_if_not_already(repl_req_state_t s);
void set_lentry(nuraft::ptr< nuraft::log_entry > const& lentry) { m_lentry = lentry; }
void clear();
flatbuffers::FlatBufferBuilder& create_fb_builder() { return m_fb_builder; }
void release_fb_builder() { m_fb_builder.Release(); }
Expand Down Expand Up @@ -234,6 +235,7 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost::
std::variant< std::unique_ptr< uint8_t[] >, raft_buf_ptr_t > m_journal_buf; // Buf for the journal entry
repl_journal_entry* m_journal_entry{nullptr}; // pointer to the journal entry
bool m_is_jentry_localize_pending{false}; // Is the journal entry needs to be localized from remote
nuraft::ptr< nuraft::log_entry > m_lentry;

/////////////// Replication state related section /////////////////
std::atomic< uint32_t > m_state{uint32_cast(repl_req_state_t::INIT)}; // State of the replication request
Expand Down
24 changes: 19 additions & 5 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,12 @@ RaftReplDev::RaftReplDev(RaftReplService& svc, superblk< raft_repl_dev_superblk
}

RD_LOG(INFO,
"Started {} RaftReplDev group_id={}, replica_id={}, raft_server_id={} commited_lsn={}, compact_lsn={} "
"next_dsn={} "
"Started {} RaftReplDev group_id={}, replica_id={}, raft_server_id={} commited_lsn={}, "
"compact_lsn={}, checkpoint_lsn:{}, next_dsn={} "
"log_dev={} log_store={}",
(load_existing ? "Existing" : "New"), group_id_str(), my_replica_id_str(), m_raft_server_id,
m_commit_upto_lsn.load(), m_compact_lsn.load(), m_next_dsn.load(), m_rd_sb->logdev_id, m_rd_sb->logstore_id);
m_commit_upto_lsn.load(), m_compact_lsn.load(), m_rd_sb->checkpoint_lsn, m_next_dsn.load(),
m_rd_sb->logdev_id, m_rd_sb->logstore_id);

#ifdef _PRERELEASE
m_msg_mgr.bind_data_service_request(PUSH_DATA, m_group_id, [this](intrusive< sisl::GenericRpcData >& rpc_data) {
Expand Down Expand Up @@ -746,7 +747,7 @@ void RaftReplDev::handle_fetch_data_response(sisl::GenericClientResponse respons
RD_DBG_ASSERT_EQ(total_size, 0, "Total size mismatch, some data is not consumed");
}

void RaftReplDev::handle_commit(repl_req_ptr_t rreq, bool recovery) {
void RaftReplDev::commit_blk(repl_req_ptr_t rreq) {
if (rreq->local_blkid().is_valid()) {
if (data_service().commit_blk(rreq->local_blkid()) != BlkAllocStatus::SUCCESS) {
if (hs()->device_mgr()->is_boot_in_degraded_mode() && m_log_store_replay_done)
Expand All @@ -755,6 +756,10 @@ void RaftReplDev::handle_commit(repl_req_ptr_t rreq, bool recovery) {
RD_DBG_ASSERT(false, "fail to commit blk when applying log in non-degraded mode.")
}
}
}

void RaftReplDev::handle_commit(repl_req_ptr_t rreq, bool recovery) {
commit_blk(rreq);

// Remove the request from repl_key map.
m_repl_key_req_map.erase(rreq->rkey());
Expand Down Expand Up @@ -1181,10 +1186,14 @@ void RaftReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx
}

rreq->set_lsn(repl_lsn);
// keep lentry in scope for the lyfe cycle of the rreq
rreq->set_lentry(lentry);
rreq->init(rkey, jentry->code, false /* is_proposer */, entry_to_hdr(jentry), entry_to_key(jentry), data_size);
RD_LOGD("Replay log on restart, rreq=[{}]", rreq->to_string());

if (repl_lsn > m_rd_sb->durable_commit_lsn) {
// In memory state of these blks is lost. Commit them now to avoid usage of same blk twice.
commit_blk(rreq);
m_state_machine->link_lsn_to_req(rreq, int64_cast(repl_lsn));
return;
}
Expand All @@ -1202,7 +1211,12 @@ bool RaftReplDev::is_resync_mode() {
int64_t const leader_commited_lsn = raft_server()->get_leader_committed_log_idx();
int64_t const my_log_idx = raft_server()->get_last_log_idx();
auto diff = leader_commited_lsn - my_log_idx;
return diff > HS_DYNAMIC_CONFIG(consensus.resync_log_idx_threshold);
bool resync_mode = (diff > HS_DYNAMIC_CONFIG(consensus.resync_log_idx_threshold));
if (resync_mode) {
RD_LOGD("Raft Channel: Resync mode, leader_commited_lsn={}, my_log_idx={}, diff={}", leader_commited_lsn,
my_log_idx, diff);
}
return resync_mode;
}

} // namespace homestore
1 change: 1 addition & 0 deletions src/lib/replication/repl_dev/raft_repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ class RaftReplDev : public ReplDev,
void handle_error(repl_req_ptr_t const& rreq, ReplServiceError err);
bool wait_for_data_receive(std::vector< repl_req_ptr_t > const& rreqs, uint64_t timeout_ms);
void on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx);
void commit_blk(repl_req_ptr_t rreq);
};

} // namespace homestore

0 comments on commit 8d6d70b

Please sign in to comment.