Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

convert log store lsn to repl lsn in repl_dev #505

Merged
merged 1 commit into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.4.46"
version = "6.4.47"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
1 change: 0 additions & 1 deletion src/lib/replication/log_store/home_raft_log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ static constexpr logstore_seq_num_t to_store_lsn(uint64_t raft_lsn) {
static constexpr logstore_seq_num_t to_store_lsn(repl_lsn_t repl_lsn) {
return static_cast< logstore_seq_num_t >(repl_lsn - 1);
}
static constexpr repl_lsn_t to_repl_lsn(store_lsn_t store_lsn) { return store_lsn + 1; }

static uint64_t extract_term(const log_buffer& log_bytes) {
uint8_t const* raw_ptr = log_bytes.bytes();
Expand Down
2 changes: 2 additions & 0 deletions src/lib/replication/log_store/home_raft_log_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,4 +223,6 @@ static nuraft::ptr< nuraft::log_entry > to_nuraft_log_entry(sisl::blob const& lo
static nuraft::ptr< nuraft::log_entry > to_nuraft_log_entry(const log_buffer& log_bytes) {
return to_nuraft_log_entry(log_bytes.get_blob());
}

static constexpr repl_lsn_t to_repl_lsn(store_lsn_t store_lsn) { return store_lsn + 1; }
} // namespace homestore
1 change: 1 addition & 0 deletions src/lib/replication/repl_dev/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ void repl_req_ctx::set_lsn(int64_t lsn) {
DEBUG_ASSERT((m_lsn == -1) || (m_lsn == lsn),
"Changing lsn for request={} on the fly can cause race condition, not expected", to_string());
m_lsn = lsn;
LOGTRACEMOD(replication, "Setting lsn={} for request={}", lsn, to_string());
}

bool repl_req_ctx::save_pushed_data(intrusive< sisl::GenericRpcData > const& pushed_data, uint8_t const* data,
Expand Down
34 changes: 20 additions & 14 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,11 @@ RaftReplDev::RaftReplDev(RaftReplService& svc, superblk< raft_repl_dev_superblk
}

RD_LOG(INFO,
"Started {} RaftReplDev group_id={}, replica_id={}, raft_server_id={} commited_lsn={} next_dsn={} "
"Started {} RaftReplDev group_id={}, replica_id={}, raft_server_id={} commited_lsn={}, compact_lsn={} "
"next_dsn={} "
"log_dev={} log_store={}",
(load_existing ? "Existing" : "New"), group_id_str(), my_replica_id_str(), m_raft_server_id,
m_commit_upto_lsn.load(), m_next_dsn.load(), m_rd_sb->logdev_id, m_rd_sb->logstore_id);
m_commit_upto_lsn.load(), m_compact_lsn.load(), m_next_dsn.load(), m_rd_sb->logdev_id, m_rd_sb->logstore_id);

#ifdef _PRERELEASE
m_msg_mgr.bind_data_service_request(PUSH_DATA, m_group_id, [this](intrusive< sisl::GenericRpcData >& rpc_data) {
Expand Down Expand Up @@ -230,20 +231,20 @@ void RaftReplDev::push_data_to_all_followers(repl_req_ptr_t rreq, sisl::sg_list
flatbuffers::FlatBufferToString(builder.GetBufferPointer() + sizeof(flatbuffers::uoffset_t),
PushDataRequestTypeTable()));*/

RD_LOGD("Data Channel: Pushing data to all followers: rreq=[{}]", rreq->to_compact_string());
RD_LOGD("Data Channel: Pushing data to all followers: rreq=[{}]", rreq->to_string());

group_msg_service()
->data_service_request_unidirectional(nuraft_mesg::role_regex::ALL, PUSH_DATA, rreq->m_pkts)
.via(&folly::InlineExecutor::instance())
.thenValue([this, rreq = std::move(rreq)](auto e) {
if (e.hasError()) {
RD_LOGE("Data Channel: Error in pushing data to all followers: rreq=[{}] error={}",
rreq->to_compact_string(), e.error());
RD_LOGE("Data Channel: Error in pushing data to all followers: rreq=[{}] error={}", rreq->to_string(),
e.error());
handle_error(rreq, RaftReplService::to_repl_error(e.error()));
return;
}
// Release the buffer which holds the packets
RD_LOGD("Data Channel: Data push completed for rreq=[{}]", rreq->to_compact_string());
RD_LOGD("Data Channel: Data push completed for rreq=[{}]", rreq->to_string());
rreq->release_fb_builder();
rreq->m_pkts.clear();
});
Expand Down Expand Up @@ -766,7 +767,7 @@ void RaftReplDev::handle_commit(repl_req_ptr_t rreq, bool recovery) {
m_next_dsn.compare_exchange_strong(cur_dsn, rreq->dsn() + 1);
}

RD_LOGD("Raft channel: Commit rreq=[{}]", rreq->to_compact_string());
RD_LOGD("Raft channel: Commit rreq=[{}]", rreq->to_string());
if (rreq->op_code() == journal_type_t::HS_CTRL_DESTROY) {
leave();
} else {
Expand All @@ -775,7 +776,9 @@ void RaftReplDev::handle_commit(repl_req_ptr_t rreq, bool recovery) {

if (!recovery) {
auto prev_lsn = m_commit_upto_lsn.exchange(rreq->lsn());
RD_DBG_ASSERT_GT(rreq->lsn(), prev_lsn, "Out of order commit of lsns, it is not expected in RaftReplDev");
RD_DBG_ASSERT_GT(rreq->lsn(), prev_lsn,
"Out of order commit of lsns, it is not expected in RaftReplDev. cur_lsns={}, prev_lsns={}",
rreq->lsn(), prev_lsn);
}
if (!rreq->is_proposer()) { rreq->clear(); }
}
Expand Down Expand Up @@ -1097,7 +1100,8 @@ void RaftReplDev::gc_repl_reqs() {

if (rreq->is_expired()) {
expired_keys.push_back(key);
RD_LOGD("rreq=[{}] is expired, cleaning up", rreq->to_compact_string());
RD_LOGD("rreq=[{}] is expired, cleaning up; elapsed_time_sec{};", rreq->to_string(),
get_elapsed_time_sec(rreq->created_time()));

// do garbage collection
// 1. free the allocated blocks
Expand All @@ -1124,8 +1128,9 @@ void RaftReplDev::gc_repl_reqs() {
}

void RaftReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx) {
auto repl_lsn = to_repl_lsn(lsn);
// apply the log entry if the lsn is between checkpoint lsn and durable commit lsn
if (lsn < m_rd_sb->checkpoint_lsn) { return; }
if (repl_lsn < m_rd_sb->checkpoint_lsn) { return; }

// 1. Get the log entry and prepare rreq
auto const lentry = to_nuraft_log_entry(buf);
Expand Down Expand Up @@ -1166,15 +1171,16 @@ void RaftReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx
rreq->init(rkey, jentry->code, false /* is_proposer */, entry_to_hdr(jentry), entry_to_key(jentry),
(entry_blkid.blk_count() * get_blk_size()));
rreq->set_local_blkid(entry_blkid);
rreq->set_lsn(lsn);
rreq->set_lsn(repl_lsn);
RD_LOGD("Replay log on restart, rreq=[{}]", rreq->to_string());

if (lsn > m_rd_sb->durable_commit_lsn) {
m_state_machine->link_lsn_to_req(rreq, int64_cast(lsn));
if (repl_lsn > m_rd_sb->durable_commit_lsn) {
m_state_machine->link_lsn_to_req(rreq, int64_cast(repl_lsn));
return;
}

// 2. Pre-commit the log entry
m_listener->on_pre_commit(lsn, entry_to_hdr(jentry), entry_to_key(jentry), nullptr);
m_listener->on_pre_commit(repl_lsn, entry_to_hdr(jentry), entry_to_key(jentry), nullptr);

// 3. Commit the log entry
handle_commit(rreq, true /* recovery */);
Expand Down
4 changes: 2 additions & 2 deletions src/lib/replication/repl_dev/raft_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,8 @@ raft_buf_ptr_t RaftStateMachine::commit_ext(nuraft::state_machine::ext_op_params
RD_LOGD("Raft channel: Received Commit message lsn {} store {} logdev {} size {}", lsn,
m_rd.m_data_journal->logstore_id(), m_rd.m_data_journal->logdev_id(), params.data->size());
repl_req_ptr_t rreq = lsn_to_req(lsn);
RD_DBG_ASSERT(rreq != nullptr, "Raft channel got null rreq");
RD_LOGD("Raft channel: Received Commit message rreq=[{}]", rreq->to_compact_string());
RD_DBG_ASSERT(rreq != nullptr, "Raft channel got null rreq for lsn={}", lsn);
RD_LOGD("Raft channel: Received Commit message rreq=[{}]", rreq->to_string());
if (rreq->is_proposer()) {
// This is the time to ensure flushing of journal happens in the proposer
rreq->add_state(repl_req_state_t::LOG_FLUSHED);
Expand Down
Loading