diff --git a/conanfile.py b/conanfile.py index 79ec4b8d4..e23c7f9d7 100644 --- a/conanfile.py +++ b/conanfile.py @@ -5,7 +5,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "6.4.16" + version = "6.4.17" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" topics = ("ebay", "nublox") diff --git a/src/lib/replication/log_store/home_raft_log_store.cpp b/src/lib/replication/log_store/home_raft_log_store.cpp index b6922ffac..80e08d809 100644 --- a/src/lib/replication/log_store/home_raft_log_store.cpp +++ b/src/lib/replication/log_store/home_raft_log_store.cpp @@ -90,7 +90,8 @@ void HomeRaftLogStore::truncate(uint32_t num_reserved_cnt, repl_lsn_t compact_ls } } -HomeRaftLogStore::HomeRaftLogStore(logdev_id_t logdev_id, logstore_id_t logstore_id) { +HomeRaftLogStore::HomeRaftLogStore(logdev_id_t logdev_id, logstore_id_t logstore_id, log_found_cb_t const& log_found_cb, + log_replay_done_cb_t const& log_replay_done_cb) { m_dummy_log_entry = nuraft::cs_new< nuraft::log_entry >(0, nuraft::buffer::alloc(0), nuraft::log_val_type::app_log); if (logstore_id == UINT32_MAX) { @@ -104,13 +105,16 @@ HomeRaftLogStore::HomeRaftLogStore(logdev_id_t logdev_id, logstore_id_t logstore m_logstore_id = logstore_id; LOGDEBUGMOD(replication, "Opening existing home log_dev={} log_store={}", m_logdev_id, logstore_id); logstore_service().open_logdev(m_logdev_id); - m_log_store_future = - logstore_service().open_log_store(m_logdev_id, logstore_id, true).thenValue([this](auto log_store) { - m_log_store = std::move(log_store); - DEBUG_ASSERT_EQ(m_logstore_id, m_log_store->get_store_id(), - "Mismatch in passed and create logstore id"); - REPL_STORE_LOG(DEBUG, "Home Log store created/opened successfully"); - }); + m_log_store_future = logstore_service() + .open_log_store(m_logdev_id, logstore_id, true) + .thenValue([this, log_found_cb, log_replay_done_cb](auto log_store) { + m_log_store = std::move(log_store); + DEBUG_ASSERT_EQ(m_logstore_id, m_log_store->get_store_id(), + "Mismatch in passed and create logstore id"); + m_log_store->register_log_found_cb(log_found_cb); + m_log_store->register_log_replay_done_cb(log_replay_done_cb); + REPL_STORE_LOG(DEBUG, "Home Log store created/opened successfully"); + }); } } @@ -314,8 +318,4 @@ ulong HomeRaftLogStore::last_durable_index() { void HomeRaftLogStore::wait_for_log_store_ready() { m_log_store_future.wait(); } -void HomeRaftLogStore::register_log_replay_done_cb(const log_replay_done_cb_t& cb) { - m_log_store->register_log_replay_done_cb(cb); -}; - } // namespace homestore diff --git a/src/lib/replication/log_store/home_raft_log_store.h b/src/lib/replication/log_store/home_raft_log_store.h index 364ea20a6..5e6e589ad 100644 --- a/src/lib/replication/log_store/home_raft_log_store.h +++ b/src/lib/replication/log_store/home_raft_log_store.h @@ -33,7 +33,9 @@ using raft_buf_ptr_t = nuraft::ptr< nuraft::buffer >; class HomeRaftLogStore : public nuraft::log_store { public: - HomeRaftLogStore(logdev_id_t logdev_id = UINT32_MAX, homestore::logstore_id_t logstore_id = UINT32_MAX); + HomeRaftLogStore(logdev_id_t logdev_id = UINT32_MAX, homestore::logstore_id_t logstore_id = UINT32_MAX, + log_found_cb_t const& log_found_cb = nullptr, + log_replay_done_cb_t const& log_replay_done_cb = nullptr); virtual ~HomeRaftLogStore() = default; void remove_store(); @@ -189,9 +191,6 @@ class HomeRaftLogStore : public nuraft::log_store { void truncate(uint32_t num_reserved_cnt, repl_lsn_t compact_lsn); void wait_for_log_store_ready(); - void register_log_found_cb(const log_found_cb_t& cb) { m_log_store->register_log_found_cb(cb); } - - void register_log_replay_done_cb(const log_replay_done_cb_t& cb); private: logstore_id_t m_logstore_id; diff --git a/src/lib/replication/log_store/repl_log_store.h b/src/lib/replication/log_store/repl_log_store.h index 7a270687b..a386d397b 100644 --- a/src/lib/replication/log_store/repl_log_store.h +++ b/src/lib/replication/log_store/repl_log_store.h @@ -19,12 +19,8 @@ class ReplLogStore : public HomeRaftLogStore { public: template < typename... Args > - ReplLogStore(RaftReplDev& rd, RaftStateMachine& sm, const log_found_cb_t& log_found_cb, - const log_replay_done_cb_t& log_replay_done_cb, Args&&... args) : - HomeRaftLogStore{std::forward< Args >(args)...}, m_rd{rd}, m_sm{sm} { - register_log_found_cb(log_found_cb); - register_log_replay_done_cb(log_replay_done_cb); - } + ReplLogStore(RaftReplDev& rd, RaftStateMachine& sm, Args&&... args) : + HomeRaftLogStore(std::forward< Args >(args)...), m_rd{rd}, m_sm{sm} {} //////////////////////// function override //////////////////////// uint64_t append(nuraft::ptr< nuraft::log_entry >& entry) override; diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index b3cf0ae83..25f54a3c8 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -37,10 +37,9 @@ RaftReplDev::RaftReplDev(RaftReplService& svc, superblk< raft_repl_dev_superblk if (load_existing) { m_data_journal = std::make_shared< ReplLogStore >( - *this, *m_state_machine, + *this, *m_state_machine, m_rd_sb->logdev_id, m_rd_sb->logstore_id, [this](logstore_seq_num_t lsn, log_buffer buf, void* key) { on_log_found(lsn, buf, key); }, - [this](std::shared_ptr< HomeLogStore > hs, logstore_seq_num_t lsn) { m_log_store_replay_done = true; }, - m_rd_sb->logdev_id, m_rd_sb->logstore_id); + [this](std::shared_ptr< HomeLogStore > hs, logstore_seq_num_t lsn) { m_log_store_replay_done = true; }); m_next_dsn = m_rd_sb->last_applied_dsn + 1; m_commit_upto_lsn = m_rd_sb->durable_commit_lsn; m_last_flushed_commit_lsn = m_commit_upto_lsn; @@ -62,7 +61,7 @@ RaftReplDev::RaftReplDev(RaftReplService& svc, superblk< raft_repl_dev_superblk }); } } else { - m_data_journal = std::make_shared< ReplLogStore >(*this, *m_state_machine, nullptr, nullptr); + m_data_journal = std::make_shared< ReplLogStore >(*this, *m_state_machine); m_rd_sb->logdev_id = m_data_journal->logdev_id(); m_rd_sb->logstore_id = m_data_journal->logstore_id(); m_rd_sb->last_applied_dsn = 0; @@ -1039,6 +1038,10 @@ void RaftReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx // 1. Get the log entry and prepare rreq nuraft::log_entry const* lentry = r_cast< nuraft::log_entry const* >(buf.bytes()); + + // TODO: Handle the case where the log entry is not app_log, example config logs + if(lentry->get_val_type() != nuraft::log_val_type::app_log) { return; } + repl_journal_entry* jentry = r_cast< repl_journal_entry* >(lentry->get_buf().data_begin()); RELEASE_ASSERT_EQ(jentry->major_version, repl_journal_entry::JOURNAL_ENTRY_MAJOR, "Mismatched version of journal entry received from RAFT peer");