Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

register log store callbacks after m_log_store is available #445

Merged
merged 2 commits into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.4.16"
version = "6.4.17"
homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
topics = ("ebay", "nublox")
Expand Down
24 changes: 12 additions & 12 deletions src/lib/replication/log_store/home_raft_log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ void HomeRaftLogStore::truncate(uint32_t num_reserved_cnt, repl_lsn_t compact_ls
}
}

HomeRaftLogStore::HomeRaftLogStore(logdev_id_t logdev_id, logstore_id_t logstore_id) {
HomeRaftLogStore::HomeRaftLogStore(logdev_id_t logdev_id, logstore_id_t logstore_id, log_found_cb_t const& log_found_cb,
log_replay_done_cb_t const& log_replay_done_cb) {
m_dummy_log_entry = nuraft::cs_new< nuraft::log_entry >(0, nuraft::buffer::alloc(0), nuraft::log_val_type::app_log);

if (logstore_id == UINT32_MAX) {
Expand All @@ -104,13 +105,16 @@ HomeRaftLogStore::HomeRaftLogStore(logdev_id_t logdev_id, logstore_id_t logstore
m_logstore_id = logstore_id;
LOGDEBUGMOD(replication, "Opening existing home log_dev={} log_store={}", m_logdev_id, logstore_id);
logstore_service().open_logdev(m_logdev_id);
m_log_store_future =
logstore_service().open_log_store(m_logdev_id, logstore_id, true).thenValue([this](auto log_store) {
m_log_store = std::move(log_store);
DEBUG_ASSERT_EQ(m_logstore_id, m_log_store->get_store_id(),
"Mismatch in passed and create logstore id");
REPL_STORE_LOG(DEBUG, "Home Log store created/opened successfully");
});
m_log_store_future = logstore_service()
.open_log_store(m_logdev_id, logstore_id, true)
.thenValue([this, log_found_cb, log_replay_done_cb](auto log_store) {
m_log_store = std::move(log_store);
DEBUG_ASSERT_EQ(m_logstore_id, m_log_store->get_store_id(),
"Mismatch in passed and create logstore id");
m_log_store->register_log_found_cb(log_found_cb);
m_log_store->register_log_replay_done_cb(log_replay_done_cb);
REPL_STORE_LOG(DEBUG, "Home Log store created/opened successfully");
});
}
}

Expand Down Expand Up @@ -314,8 +318,4 @@ ulong HomeRaftLogStore::last_durable_index() {

void HomeRaftLogStore::wait_for_log_store_ready() { m_log_store_future.wait(); }

void HomeRaftLogStore::register_log_replay_done_cb(const log_replay_done_cb_t& cb) {
m_log_store->register_log_replay_done_cb(cb);
};

} // namespace homestore
7 changes: 3 additions & 4 deletions src/lib/replication/log_store/home_raft_log_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ using raft_buf_ptr_t = nuraft::ptr< nuraft::buffer >;

class HomeRaftLogStore : public nuraft::log_store {
public:
HomeRaftLogStore(logdev_id_t logdev_id = UINT32_MAX, homestore::logstore_id_t logstore_id = UINT32_MAX);
HomeRaftLogStore(logdev_id_t logdev_id = UINT32_MAX, homestore::logstore_id_t logstore_id = UINT32_MAX,
log_found_cb_t const& log_found_cb = nullptr,
log_replay_done_cb_t const& log_replay_done_cb = nullptr);
virtual ~HomeRaftLogStore() = default;

void remove_store();
Expand Down Expand Up @@ -189,9 +191,6 @@ class HomeRaftLogStore : public nuraft::log_store {
void truncate(uint32_t num_reserved_cnt, repl_lsn_t compact_lsn);

void wait_for_log_store_ready();
void register_log_found_cb(const log_found_cb_t& cb) { m_log_store->register_log_found_cb(cb); }

void register_log_replay_done_cb(const log_replay_done_cb_t& cb);

private:
logstore_id_t m_logstore_id;
Expand Down
8 changes: 2 additions & 6 deletions src/lib/replication/log_store/repl_log_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,8 @@ class ReplLogStore : public HomeRaftLogStore {

public:
template < typename... Args >
ReplLogStore(RaftReplDev& rd, RaftStateMachine& sm, const log_found_cb_t& log_found_cb,
const log_replay_done_cb_t& log_replay_done_cb, Args&&... args) :
HomeRaftLogStore{std::forward< Args >(args)...}, m_rd{rd}, m_sm{sm} {
register_log_found_cb(log_found_cb);
register_log_replay_done_cb(log_replay_done_cb);
}
ReplLogStore(RaftReplDev& rd, RaftStateMachine& sm, Args&&... args) :
HomeRaftLogStore(std::forward< Args >(args)...), m_rd{rd}, m_sm{sm} {}

//////////////////////// function override ////////////////////////
uint64_t append(nuraft::ptr< nuraft::log_entry >& entry) override;
Expand Down
11 changes: 7 additions & 4 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,9 @@ RaftReplDev::RaftReplDev(RaftReplService& svc, superblk< raft_repl_dev_superblk

if (load_existing) {
m_data_journal = std::make_shared< ReplLogStore >(
*this, *m_state_machine,
*this, *m_state_machine, m_rd_sb->logdev_id, m_rd_sb->logstore_id,
[this](logstore_seq_num_t lsn, log_buffer buf, void* key) { on_log_found(lsn, buf, key); },
[this](std::shared_ptr< HomeLogStore > hs, logstore_seq_num_t lsn) { m_log_store_replay_done = true; },
m_rd_sb->logdev_id, m_rd_sb->logstore_id);
[this](std::shared_ptr< HomeLogStore > hs, logstore_seq_num_t lsn) { m_log_store_replay_done = true; });
m_next_dsn = m_rd_sb->last_applied_dsn + 1;
m_commit_upto_lsn = m_rd_sb->durable_commit_lsn;
m_last_flushed_commit_lsn = m_commit_upto_lsn;
Expand All @@ -62,7 +61,7 @@ RaftReplDev::RaftReplDev(RaftReplService& svc, superblk< raft_repl_dev_superblk
});
}
} else {
m_data_journal = std::make_shared< ReplLogStore >(*this, *m_state_machine, nullptr, nullptr);
m_data_journal = std::make_shared< ReplLogStore >(*this, *m_state_machine);
m_rd_sb->logdev_id = m_data_journal->logdev_id();
m_rd_sb->logstore_id = m_data_journal->logstore_id();
m_rd_sb->last_applied_dsn = 0;
Expand Down Expand Up @@ -1039,6 +1038,10 @@ void RaftReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx

// 1. Get the log entry and prepare rreq
nuraft::log_entry const* lentry = r_cast< nuraft::log_entry const* >(buf.bytes());

// TODO: Handle the case where the log entry is not app_log, example config logs
if(lentry->get_val_type() != nuraft::log_val_type::app_log) { return; }

repl_journal_entry* jentry = r_cast< repl_journal_entry* >(lentry->get_buf().data_begin());
RELEASE_ASSERT_EQ(jentry->major_version, repl_journal_entry::JOURNAL_ENTRY_MAJOR,
"Mismatched version of journal entry received from RAFT peer");
Expand Down
Loading