Skip to content

Commit

Permalink
Merge pull request #445 from raakella1/durable_commit_regression
Browse files Browse the repository at this point in the history
register log store callbacks after m_log_store is available
  • Loading branch information
raakella1 authored Jun 14, 2024
2 parents 08a9200 + 3420f16 commit 718d169
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 27 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.4.16"
version = "6.4.17"
homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
topics = ("ebay", "nublox")
Expand Down
24 changes: 12 additions & 12 deletions src/lib/replication/log_store/home_raft_log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ void HomeRaftLogStore::truncate(uint32_t num_reserved_cnt, repl_lsn_t compact_ls
}
}

HomeRaftLogStore::HomeRaftLogStore(logdev_id_t logdev_id, logstore_id_t logstore_id) {
HomeRaftLogStore::HomeRaftLogStore(logdev_id_t logdev_id, logstore_id_t logstore_id, log_found_cb_t const& log_found_cb,
log_replay_done_cb_t const& log_replay_done_cb) {
m_dummy_log_entry = nuraft::cs_new< nuraft::log_entry >(0, nuraft::buffer::alloc(0), nuraft::log_val_type::app_log);

if (logstore_id == UINT32_MAX) {
Expand All @@ -104,13 +105,16 @@ HomeRaftLogStore::HomeRaftLogStore(logdev_id_t logdev_id, logstore_id_t logstore
m_logstore_id = logstore_id;
LOGDEBUGMOD(replication, "Opening existing home log_dev={} log_store={}", m_logdev_id, logstore_id);
logstore_service().open_logdev(m_logdev_id);
m_log_store_future =
logstore_service().open_log_store(m_logdev_id, logstore_id, true).thenValue([this](auto log_store) {
m_log_store = std::move(log_store);
DEBUG_ASSERT_EQ(m_logstore_id, m_log_store->get_store_id(),
"Mismatch in passed and create logstore id");
REPL_STORE_LOG(DEBUG, "Home Log store created/opened successfully");
});
m_log_store_future = logstore_service()
.open_log_store(m_logdev_id, logstore_id, true)
.thenValue([this, log_found_cb, log_replay_done_cb](auto log_store) {
m_log_store = std::move(log_store);
DEBUG_ASSERT_EQ(m_logstore_id, m_log_store->get_store_id(),
"Mismatch in passed and create logstore id");
m_log_store->register_log_found_cb(log_found_cb);
m_log_store->register_log_replay_done_cb(log_replay_done_cb);
REPL_STORE_LOG(DEBUG, "Home Log store created/opened successfully");
});
}
}

Expand Down Expand Up @@ -314,8 +318,4 @@ ulong HomeRaftLogStore::last_durable_index() {

void HomeRaftLogStore::wait_for_log_store_ready() { m_log_store_future.wait(); }

void HomeRaftLogStore::register_log_replay_done_cb(const log_replay_done_cb_t& cb) {
m_log_store->register_log_replay_done_cb(cb);
};

} // namespace homestore
7 changes: 3 additions & 4 deletions src/lib/replication/log_store/home_raft_log_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ using raft_buf_ptr_t = nuraft::ptr< nuraft::buffer >;

class HomeRaftLogStore : public nuraft::log_store {
public:
HomeRaftLogStore(logdev_id_t logdev_id = UINT32_MAX, homestore::logstore_id_t logstore_id = UINT32_MAX);
HomeRaftLogStore(logdev_id_t logdev_id = UINT32_MAX, homestore::logstore_id_t logstore_id = UINT32_MAX,
log_found_cb_t const& log_found_cb = nullptr,
log_replay_done_cb_t const& log_replay_done_cb = nullptr);
virtual ~HomeRaftLogStore() = default;

void remove_store();
Expand Down Expand Up @@ -189,9 +191,6 @@ class HomeRaftLogStore : public nuraft::log_store {
void truncate(uint32_t num_reserved_cnt, repl_lsn_t compact_lsn);

void wait_for_log_store_ready();
void register_log_found_cb(const log_found_cb_t& cb) { m_log_store->register_log_found_cb(cb); }

void register_log_replay_done_cb(const log_replay_done_cb_t& cb);

private:
logstore_id_t m_logstore_id;
Expand Down
8 changes: 2 additions & 6 deletions src/lib/replication/log_store/repl_log_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,8 @@ class ReplLogStore : public HomeRaftLogStore {

public:
template < typename... Args >
ReplLogStore(RaftReplDev& rd, RaftStateMachine& sm, const log_found_cb_t& log_found_cb,
const log_replay_done_cb_t& log_replay_done_cb, Args&&... args) :
HomeRaftLogStore{std::forward< Args >(args)...}, m_rd{rd}, m_sm{sm} {
register_log_found_cb(log_found_cb);
register_log_replay_done_cb(log_replay_done_cb);
}
ReplLogStore(RaftReplDev& rd, RaftStateMachine& sm, Args&&... args) :
HomeRaftLogStore(std::forward< Args >(args)...), m_rd{rd}, m_sm{sm} {}

//////////////////////// function override ////////////////////////
uint64_t append(nuraft::ptr< nuraft::log_entry >& entry) override;
Expand Down
11 changes: 7 additions & 4 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,9 @@ RaftReplDev::RaftReplDev(RaftReplService& svc, superblk< raft_repl_dev_superblk

if (load_existing) {
m_data_journal = std::make_shared< ReplLogStore >(
*this, *m_state_machine,
*this, *m_state_machine, m_rd_sb->logdev_id, m_rd_sb->logstore_id,
[this](logstore_seq_num_t lsn, log_buffer buf, void* key) { on_log_found(lsn, buf, key); },
[this](std::shared_ptr< HomeLogStore > hs, logstore_seq_num_t lsn) { m_log_store_replay_done = true; },
m_rd_sb->logdev_id, m_rd_sb->logstore_id);
[this](std::shared_ptr< HomeLogStore > hs, logstore_seq_num_t lsn) { m_log_store_replay_done = true; });
m_next_dsn = m_rd_sb->last_applied_dsn + 1;
m_commit_upto_lsn = m_rd_sb->durable_commit_lsn;
m_last_flushed_commit_lsn = m_commit_upto_lsn;
Expand All @@ -62,7 +61,7 @@ RaftReplDev::RaftReplDev(RaftReplService& svc, superblk< raft_repl_dev_superblk
});
}
} else {
m_data_journal = std::make_shared< ReplLogStore >(*this, *m_state_machine, nullptr, nullptr);
m_data_journal = std::make_shared< ReplLogStore >(*this, *m_state_machine);
m_rd_sb->logdev_id = m_data_journal->logdev_id();
m_rd_sb->logstore_id = m_data_journal->logstore_id();
m_rd_sb->last_applied_dsn = 0;
Expand Down Expand Up @@ -1039,6 +1038,10 @@ void RaftReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx

// 1. Get the log entry and prepare rreq
nuraft::log_entry const* lentry = r_cast< nuraft::log_entry const* >(buf.bytes());

// TODO: Handle the case where the log entry is not app_log, example config logs
if(lentry->get_val_type() != nuraft::log_val_type::app_log) { return; }

repl_journal_entry* jentry = r_cast< repl_journal_entry* >(lentry->get_buf().data_begin());
RELEASE_ASSERT_EQ(jentry->major_version, repl_journal_entry::JOURNAL_ENTRY_MAJOR,
"Mismatched version of journal entry received from RAFT peer");
Expand Down

0 comments on commit 718d169

Please sign in to comment.