Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

commit blk during log replay in rep dev #524

Merged
merged 1 commit into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.4.52"
version = "6.4.53"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
4 changes: 3 additions & 1 deletion src/include/homestore/replication/repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ template < typename T >
using ptr = std::shared_ptr< T >;

class buffer;
class log_entry;
} // namespace nuraft

namespace homestore {
Expand Down Expand Up @@ -149,7 +150,6 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost::

raft_buf_ptr_t& raft_journal_buf();
uint8_t* raw_journal_buf();

/////////////////////// Non modifiers methods //////////////////
std::string to_string() const;
std::string to_compact_string() const;
Expand Down Expand Up @@ -203,6 +203,7 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost::
void set_lsn(int64_t lsn);
void add_state(repl_req_state_t s);
bool add_state_if_not_already(repl_req_state_t s);
void set_lentry(nuraft::ptr< nuraft::log_entry > const& lentry) { m_lentry = lentry; }
void clear();
flatbuffers::FlatBufferBuilder& create_fb_builder() { return m_fb_builder; }
void release_fb_builder() { m_fb_builder.Release(); }
Expand Down Expand Up @@ -234,6 +235,7 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost::
std::variant< std::unique_ptr< uint8_t[] >, raft_buf_ptr_t > m_journal_buf; // Buf for the journal entry
repl_journal_entry* m_journal_entry{nullptr}; // pointer to the journal entry
bool m_is_jentry_localize_pending{false}; // Is the journal entry needs to be localized from remote
nuraft::ptr< nuraft::log_entry > m_lentry;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be nice to put a comment here that this is only needed during recovery


/////////////// Replication state related section /////////////////
std::atomic< uint32_t > m_state{uint32_cast(repl_req_state_t::INIT)}; // State of the replication request
Expand Down
24 changes: 19 additions & 5 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,12 @@ RaftReplDev::RaftReplDev(RaftReplService& svc, superblk< raft_repl_dev_superblk
}

RD_LOG(INFO,
"Started {} RaftReplDev group_id={}, replica_id={}, raft_server_id={} commited_lsn={}, compact_lsn={} "
"next_dsn={} "
"Started {} RaftReplDev group_id={}, replica_id={}, raft_server_id={} commited_lsn={}, "
"compact_lsn={}, checkpoint_lsn:{}, next_dsn={} "
"log_dev={} log_store={}",
(load_existing ? "Existing" : "New"), group_id_str(), my_replica_id_str(), m_raft_server_id,
m_commit_upto_lsn.load(), m_compact_lsn.load(), m_next_dsn.load(), m_rd_sb->logdev_id, m_rd_sb->logstore_id);
m_commit_upto_lsn.load(), m_compact_lsn.load(), m_rd_sb->checkpoint_lsn, m_next_dsn.load(),
m_rd_sb->logdev_id, m_rd_sb->logstore_id);

#ifdef _PRERELEASE
m_msg_mgr.bind_data_service_request(PUSH_DATA, m_group_id, [this](intrusive< sisl::GenericRpcData >& rpc_data) {
Expand Down Expand Up @@ -746,7 +747,7 @@ void RaftReplDev::handle_fetch_data_response(sisl::GenericClientResponse respons
RD_DBG_ASSERT_EQ(total_size, 0, "Total size mismatch, some data is not consumed");
}

void RaftReplDev::handle_commit(repl_req_ptr_t rreq, bool recovery) {
void RaftReplDev::commit_blk(repl_req_ptr_t rreq) {
if (rreq->local_blkid().is_valid()) {
if (data_service().commit_blk(rreq->local_blkid()) != BlkAllocStatus::SUCCESS) {
if (hs()->device_mgr()->is_boot_in_degraded_mode() && m_log_store_replay_done)
Expand All @@ -755,6 +756,10 @@ void RaftReplDev::handle_commit(repl_req_ptr_t rreq, bool recovery) {
RD_DBG_ASSERT(false, "fail to commit blk when applying log in non-degraded mode.")
}
}
}

void RaftReplDev::handle_commit(repl_req_ptr_t rreq, bool recovery) {
commit_blk(rreq);

// Remove the request from repl_key map.
m_repl_key_req_map.erase(rreq->rkey());
Expand Down Expand Up @@ -1181,10 +1186,14 @@ void RaftReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not directly related to this PR, but could impact some of the assert in underlying layer, is SM long run currently use debug build or release build?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debug build

rreq->set_lsn(repl_lsn);
// keep lentry in scope for the lyfe cycle of the rreq
raakella1 marked this conversation as resolved.
Show resolved Hide resolved
rreq->set_lentry(lentry);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit hacky as we dont set it in normal IO path , we believe nuraft will not release them during the whole lifecycle till commit().

I would vote even for normal operations we keep lentry shared_ptr for safety and more consistent behavior.

rreq->init(rkey, jentry->code, false /* is_proposer */, entry_to_hdr(jentry), entry_to_key(jentry), data_size);
RD_LOGD("Replay log on restart, rreq=[{}]", rreq->to_string());

if (repl_lsn > m_rd_sb->durable_commit_lsn) {
// In memory state of these blks is lost. Commit them now to avoid usage of same blk twice.
commit_blk(rreq);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: can we dirty the allocator with these blks and still commit them in handle_commit()?. Just in the case for rollback.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to dirty the blks without allocating them?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not yet but we can change dataservice api?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also implement roll_back to free blk

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: can we dirty the allocator with these blks and still commit them in handle_commit()?. Just in the case for rollback.

Just be noted commit_blk will be expecting blks in allocated state for non-recovery mode (existing handle_commit), and for recovery mode, it will try to reserve on cache also before committing to disk; Need a bit more handling to combine them.

m_state_machine->link_lsn_to_req(rreq, int64_cast(repl_lsn));
return;
}
Expand All @@ -1202,7 +1211,12 @@ bool RaftReplDev::is_resync_mode() {
int64_t const leader_commited_lsn = raft_server()->get_leader_committed_log_idx();
int64_t const my_log_idx = raft_server()->get_last_log_idx();
auto diff = leader_commited_lsn - my_log_idx;
return diff > HS_DYNAMIC_CONFIG(consensus.resync_log_idx_threshold);
bool resync_mode = (diff > HS_DYNAMIC_CONFIG(consensus.resync_log_idx_threshold));
if (resync_mode) {
RD_LOGD("Raft Channel: Resync mode, leader_commited_lsn={}, my_log_idx={}, diff={}", leader_commited_lsn,
my_log_idx, diff);
}
return resync_mode;
}

} // namespace homestore
1 change: 1 addition & 0 deletions src/lib/replication/repl_dev/raft_repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ class RaftReplDev : public ReplDev,
void handle_error(repl_req_ptr_t const& rreq, ReplServiceError err);
bool wait_for_data_receive(std::vector< repl_req_ptr_t > const& rreqs, uint64_t timeout_ms);
void on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx);
void commit_blk(repl_req_ptr_t rreq);
};

} // namespace homestore
Loading