From a6d6594d69ac490eda0213e4e975336afdcbfc6c Mon Sep 17 00:00:00 2001 From: Sanal P Date: Mon, 22 Jul 2024 15:22:17 -0700 Subject: [PATCH] Add baseline resync to support read and write snapshots. Add nuraft based snapshot sync across leader and followers. Implement the read and write snapshot callback. Add a generic snapshot structure to support different implemenations. Listener implements necessary function to load/save snapshot state. Listener also implements on how to read and write actual snapshot data. Added test to do basic baseline test. --- conanfile.py | 2 +- src/include/homestore/replication/repl_dev.h | 83 ++++++- src/lib/logstore/log_dev.cpp | 5 + .../log_store/home_raft_log_store.cpp | 24 +- .../log_store/home_raft_log_store.h | 1 + .../replication/log_store/repl_log_store.cpp | 6 +- .../replication/repl_dev/raft_repl_dev.cpp | 9 +- src/lib/replication/repl_dev/raft_repl_dev.h | 5 +- .../repl_dev/raft_state_machine.cpp | 63 ++++- .../replication/repl_dev/raft_state_machine.h | 8 +- src/tests/test_raft_repl_dev.cpp | 226 +++++++++++++++++- src/tests/test_solo_repl_dev.cpp | 10 +- 12 files changed, 409 insertions(+), 33 deletions(-) diff --git a/conanfile.py b/conanfile.py index d52f2fe88..8fb798180 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "6.4.37" + version = "6.4.38" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/include/homestore/replication/repl_dev.h b/src/include/homestore/replication/repl_dev.h index 4dd57ce2d..2e7746621 100644 --- a/src/include/homestore/replication/repl_dev.h +++ b/src/include/homestore/replication/repl_dev.h @@ -59,9 +59,60 @@ struct repl_key { std::string to_string() const { return fmt::format("server={}, term={}, dsn={}", server_id, term, dsn); } }; -struct repl_snapshot { - uint64_t last_log_idx_{0}; - uint64_t last_log_term_{0}; +using repl_snapshot = nuraft::snapshot; +using repl_snapshot_ptr = nuraft::ptr< nuraft::snapshot >; + +// Consumers of the ReplDevListener dont have to know what underlying +// snapshot implementation is used. Consumers can export and save the state +// of the snapshot using serialize and load the state using deserialize. +class snapshot_context { +public: + snapshot_context(int64_t lsn) : lsn_(lsn) {} + virtual ~snapshot_context() = default; + virtual void deserialize(const sisl::io_blob_safe& snp_ctx) = 0; + virtual sisl::io_blob_safe serialize() = 0; + int64_t get_lsn() { return lsn_; } + +protected: + int64_t lsn_; +}; + +class nuraft_snapshot_context : public snapshot_context { +public: + nuraft_snapshot_context(nuraft::snapshot& snp) : snapshot_context(snp.get_last_log_idx()) { + auto snp_buf = snp.serialize(); + snapshot_ = nuraft::snapshot::deserialize(*snp_buf); + } + + void deserialize(const sisl::io_blob_safe& snp_ctx) override { + // Load the context from the io blob to nuraft buffer. + auto snp_buf = nuraft::buffer::alloc(snp_ctx.size()); + nuraft::buffer_serializer bs(snp_buf); + bs.put_raw(snp_ctx.cbytes(), snp_ctx.size()); + snapshot_ = nuraft::snapshot::deserialize(bs); + lsn_ = snapshot_->get_last_log_idx(); + } + + sisl::io_blob_safe serialize() override { + // Dump the context from nuraft buffer to the io blob. + auto snp_buf = snapshot_->serialize(); + sisl::io_blob_safe blob{s_cast< size_t >(snp_buf->size())}; + std::memcpy(blob.bytes(), snp_buf->data_begin(), snp_buf->size()); + return blob; + } + + nuraft::ptr< nuraft::snapshot > nuraft_snapshot() { return snapshot_; } + +private: + nuraft::ptr< nuraft::snapshot > snapshot_; +}; + +struct snapshot_data { + void* user_ctx{nullptr}; + int64_t offset{0}; + sisl::io_blob_safe blob; + bool is_first_obj{false}; + bool is_last_obj{false}; }; struct repl_journal_entry; @@ -285,8 +336,30 @@ class ReplDevListener { /// after restart in case crash happened during the destroy. virtual void on_destroy() = 0; - /// @brief Called when the snapshot is being created by nuraft; - virtual AsyncReplResult<> create_snapshot(repl_snapshot& s) = 0; + /// @brief Called when the snapshot is being created by nuraft + virtual AsyncReplResult<> create_snapshot(shared< snapshot_context > context) = 0; + + /// @brief Called when nuraft does the baseline resync and in the end apply snapshot. + virtual bool apply_snapshot(shared< snapshot_context > context) = 0; + + /// @brief Get the last snapshot saved. + virtual shared< snapshot_context > last_snapshot() = 0; + + /// @brief Called on the leader side when the follower wants to do baseline resync and leader + /// uses offset given by the follower to the know the current state of the follower. + /// Leader sends the snapshot data to the follower in batch. This callback is called multiple + /// times on the leader till all the data is transferred to the follower. is_last_obj in + /// snapshot_data will be true once all the data has been trasnferred. After this the raft on + /// the follower side can do the incremental resync. + virtual int read_snapshot_data(shared< snapshot_context > context, shared< snapshot_data > snp_data) = 0; + + /// @brief Called on the follower when the leader sends the data during the baseline resyc. + /// is_last_obj in in snapshot_data will be true once all the data has been transfered. + /// After this the raft on the follower side can do the incremental resync. + virtual void write_snapshot_data(shared< snapshot_context > context, shared< snapshot_data > snp_data) = 0; + + /// @brief Free up user-defined context inside the snapshot_data that is allocated during read_snapshot_data. + virtual void free_user_snp_ctx(void*& user_snp_ctx) = 0; private: std::weak_ptr< ReplDev > m_repl_dev; diff --git a/src/lib/logstore/log_dev.cpp b/src/lib/logstore/log_dev.cpp index 1bb6e1838..f35429e3c 100644 --- a/src/lib/logstore/log_dev.cpp +++ b/src/lib/logstore/log_dev.cpp @@ -390,6 +390,11 @@ bool LogDev::can_flush_in_this_thread() { // our threshold. If so, it first flushes whats accumulated so far and then add the pending flush size counter with // the new record size bool LogDev::flush_if_needed(int64_t threshold_size) { + { + std::unique_lock< std::mutex > lk{m_block_flush_q_mutex}; + if (m_stopped) { return false; } + } + // If after adding the record size, if we have enough to flush or if its been too much time before we actually // flushed, attempt to flush by setting the atomic bool variable. if (threshold_size < 0) { threshold_size = LogDev::flush_data_threshold_size(); } diff --git a/src/lib/replication/log_store/home_raft_log_store.cpp b/src/lib/replication/log_store/home_raft_log_store.cpp index a84dc9633..f49e5a48f 100644 --- a/src/lib/replication/log_store/home_raft_log_store.cpp +++ b/src/lib/replication/log_store/home_raft_log_store.cpp @@ -18,6 +18,7 @@ #include #include "common/homestore_assert.hpp" #include +#include using namespace homestore; @@ -165,6 +166,7 @@ void HomeRaftLogStore::write_at(ulong index, nuraft::ptr< nuraft::log_entry >& e void HomeRaftLogStore::end_of_append_batch(ulong start, ulong cnt) { store_lsn_t end_lsn = to_store_lsn(start + cnt - 1); m_log_store->flush_sync(end_lsn); + REPL_STORE_LOG(DEBUG, "end_of_append_batch flushed upto start={} cnt={} lsn={}", start, cnt, start + cnt - 1); m_last_durable_lsn = end_lsn; } @@ -172,9 +174,13 @@ nuraft::ptr< std::vector< nuraft::ptr< nuraft::log_entry > > > HomeRaftLogStore: auto out_vec = std::make_shared< std::vector< nuraft::ptr< nuraft::log_entry > > >(); m_log_store->foreach (to_store_lsn(start), [end, &out_vec](store_lsn_t cur, const log_buffer& entry) -> bool { bool ret = (cur < to_store_lsn(end) - 1); - if (cur < to_store_lsn(end)) { out_vec->emplace_back(to_nuraft_log_entry(entry)); } + if (cur < to_store_lsn(end)) { + // REPL_STORE_LOG(TRACE, "log_entries lsn={}", cur + 1); + out_vec->emplace_back(to_nuraft_log_entry(entry)); + } return ret; }); + REPL_STORE_LOG(TRACE, "Num log entries start={} end={} num_entries={}", start, end, out_vec->size()); return out_vec; } @@ -184,7 +190,7 @@ nuraft::ptr< nuraft::log_entry > HomeRaftLogStore::entry_at(ulong index) { auto log_bytes = m_log_store->read_sync(to_store_lsn(index)); nle = to_nuraft_log_entry(log_bytes); } catch (const std::exception& e) { - REPL_STORE_LOG(ERROR, "entry_at({}) index out_of_range", index); + REPL_STORE_LOG(ERROR, "entry_at({}) index out_of_range start {} end {}", index, start_index(), last_index()); throw e; } return nle; @@ -196,7 +202,7 @@ ulong HomeRaftLogStore::term_at(ulong index) { auto log_bytes = m_log_store->read_sync(to_store_lsn(index)); term = extract_term(log_bytes); } catch (const std::exception& e) { - REPL_STORE_LOG(ERROR, "term_at({}) index out_of_range", index); + REPL_STORE_LOG(ERROR, "term_at({}) index out_of_range start {} end {}", index, start_index(), last_index()); throw e; } return term; @@ -271,7 +277,7 @@ bool HomeRaftLogStore::compact(ulong compact_lsn) { if (cur_max_lsn < to_store_lsn(compact_lsn)) { // release this assert if for some use case, we should tolorant this case; // for now, don't expect this case to happen. - RELEASE_ASSERT(false, "compact_lsn={} is beyond the current max_lsn={}", compact_lsn, cur_max_lsn); + // RELEASE_ASSERT(false, "compact_lsn={} is beyond the current max_lsn={}", compact_lsn, cur_max_lsn); // We need to fill the remaining entries with dummy data. for (auto lsn{cur_max_lsn + 1}; lsn <= to_store_lsn(compact_lsn); ++lsn) { @@ -284,7 +290,13 @@ bool HomeRaftLogStore::compact(ulong compact_lsn) { // we rely on resrouce mgr timer to trigger truncate for all log stores in system; // this will be friendly for multiple logstore on same logdev; - // m_log_store->truncate(to_store_lsn(compact_lsn)); +#ifdef _PRERELEASE + if (iomgr_flip::instance()->test_flip("force_home_raft_log_truncate")) { + REPL_STORE_LOG(TRACE, "Flip force_home_raft_log_truncate is enabled, force truncation, compact_lsn={}", + compact_lsn); + m_log_store->truncate(to_store_lsn(compact_lsn)); + } +#endif return true; } @@ -301,4 +313,6 @@ ulong HomeRaftLogStore::last_durable_index() { void HomeRaftLogStore::wait_for_log_store_ready() { m_log_store_future.wait(); } +void HomeRaftLogStore::set_last_durable_lsn(repl_lsn_t lsn) { m_last_durable_lsn = to_store_lsn(lsn); } + } // namespace homestore diff --git a/src/lib/replication/log_store/home_raft_log_store.h b/src/lib/replication/log_store/home_raft_log_store.h index c0850b2d4..bf1103537 100644 --- a/src/lib/replication/log_store/home_raft_log_store.h +++ b/src/lib/replication/log_store/home_raft_log_store.h @@ -191,6 +191,7 @@ class HomeRaftLogStore : public nuraft::log_store { void truncate(uint32_t num_reserved_cnt, repl_lsn_t compact_lsn); void wait_for_log_store_ready(); + void set_last_durable_lsn(repl_lsn_t lsn); private: logstore_id_t m_logstore_id; diff --git a/src/lib/replication/log_store/repl_log_store.cpp b/src/lib/replication/log_store/repl_log_store.cpp index bc872b807..4271d8b88 100644 --- a/src/lib/replication/log_store/repl_log_store.cpp +++ b/src/lib/replication/log_store/repl_log_store.cpp @@ -8,7 +8,7 @@ namespace homestore { uint64_t ReplLogStore::append(nuraft::ptr< nuraft::log_entry >& entry) { // We don't want to transform anything that is not an app log - if (entry->get_val_type() != nuraft::log_val_type::app_log) { + if (entry->get_val_type() != nuraft::log_val_type::app_log || entry->get_buf_ptr()->size() == 0) { ulong lsn = HomeRaftLogStore::append(entry); RD_LOGD("append entry term={}, log_val_type={} lsn={} size={}", entry->get_term(), static_cast< uint32_t >(entry->get_val_type()), lsn, entry->get_buf().size()); @@ -57,8 +57,8 @@ void ReplLogStore::end_of_append_batch(ulong start_lsn, ulong count) { } } - RD_LOGT("Raft Channel: end_of_append_batch start_lsn={} count={} num_data_to_be_written={}", start_lsn, count, - reqs->size()); + RD_LOGT("Raft Channel: end_of_append_batch start_lsn={} count={} num_data_to_be_written={} {}", start_lsn, count, + reqs->size(), proposer_reqs->size()); // All requests are from proposer for data write, so as mentioned above we can skip the flush for now if (!reqs->empty()) { diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index 5867e386d..1cd7eb19e 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -127,6 +127,7 @@ folly::SemiFuture< ReplServiceError > RaftReplDev::destroy_group() { LOGERROR("RaftReplDev::destroy_group failed {}", err); } + LOGINFO("Raft repl dev destroy_group={}", boost::uuids::to_string(m_group_id)); return m_destroy_promise.getSemiFuture(); } @@ -134,12 +135,11 @@ void RaftReplDev::use_config(json_superblk raft_config_sb) { m_raft_config_sb = void RaftReplDev::on_create_snapshot(nuraft::snapshot& s, nuraft::async_result< bool >::handler_type& when_done) { RD_LOG(DEBUG, "create_snapshot last_idx={}/term={}", s.get_last_log_idx(), s.get_last_log_term()); - repl_snapshot snapshot{.last_log_idx_ = s.get_last_log_idx(), .last_log_term_ = s.get_last_log_term()}; - auto result = m_listener->create_snapshot(snapshot).get(); + auto snp_ctx = std::make_shared< nuraft_snapshot_context >(s); + auto result = m_listener->create_snapshot(snp_ctx).get(); auto null_except = std::shared_ptr< std::exception >(); HS_REL_ASSERT(result.hasError() == false, "Not expecting creating snapshot to return false. "); - m_last_snapshot = nuraft::cs_new< nuraft::snapshot >(s.get_last_log_idx(), s.get_last_log_term(), - s.get_last_config(), s.size(), s.get_type()); + auto ret_val{true}; if (when_done) { when_done(ret_val, null_except); } } @@ -1032,6 +1032,7 @@ std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nu auto reqs = sisl::VectorPool< repl_req_ptr_t >::alloc(); for (auto& entry : entries) { if (entry->get_val_type() != nuraft::log_val_type::app_log) { continue; } + if (entry->get_buf_ptr()->size() == 0) { continue; } auto req = m_state_machine->localize_journal_entry_prepare(*entry); if (req == nullptr) { sisl::VectorPool< repl_req_ptr_t >::free(reqs); diff --git a/src/lib/replication/repl_dev/raft_repl_dev.h b/src/lib/replication/repl_dev/raft_repl_dev.h index a393d0f6b..34b3364a9 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.h +++ b/src/lib/replication/repl_dev/raft_repl_dev.h @@ -140,8 +140,6 @@ class RaftReplDev : public ReplDev, folly::Promise< ReplServiceError > m_destroy_promise; RaftReplDevMetrics m_metrics; - nuraft::ptr< nuraft::snapshot > m_last_snapshot{nullptr}; - static std::atomic< uint64_t > s_next_group_ordinal; bool m_log_store_replay_done{false}; @@ -170,6 +168,7 @@ class RaftReplDev : public ReplDev, std::string my_replica_id_str() const { return boost::uuids::to_string(m_my_repl_id); } uint32_t get_blk_size() const override; repl_lsn_t get_last_commit_lsn() const { return m_commit_upto_lsn.load(); } + void set_last_commit_lsn(repl_lsn_t lsn) { m_commit_upto_lsn.store(lsn); } bool is_destroy_pending() const; bool is_destroyed() const; Clock::time_point destroyed_time() const { return m_destroyed_time; } @@ -217,8 +216,6 @@ class RaftReplDev : public ReplDev, m_data_journal->truncate(num_reserved_entries, m_compact_lsn.load()); } - nuraft::ptr< nuraft::snapshot > get_last_snapshot() { return m_last_snapshot; } - void wait_for_logstore_ready() { m_data_journal->wait_for_log_store_ready(); } void gc_repl_reqs(); diff --git a/src/lib/replication/repl_dev/raft_state_machine.cpp b/src/lib/replication/repl_dev/raft_state_machine.cpp index b97b38945..8300fcce8 100644 --- a/src/lib/replication/repl_dev/raft_state_machine.cpp +++ b/src/lib/replication/repl_dev/raft_state_machine.cpp @@ -207,7 +207,10 @@ void RaftStateMachine::iterate_repl_reqs(std::function< void(int64_t, repl_req_p } } -uint64_t RaftStateMachine::last_commit_index() { return uint64_cast(m_rd.get_last_commit_lsn()); } +uint64_t RaftStateMachine::last_commit_index() { + RD_LOG(DEBUG, "Raft channel: last_commit_index {}", uint64_cast(m_rd.get_last_commit_lsn())); + return uint64_cast(m_rd.get_last_commit_lsn()); +} void RaftStateMachine::become_ready() { m_rd.become_ready(); } @@ -240,7 +243,63 @@ void RaftStateMachine::create_snapshot(nuraft::snapshot& s, nuraft::async_result m_rd.on_create_snapshot(s, when_done); } +int RaftStateMachine::read_logical_snp_obj(nuraft::snapshot& s, void*& user_ctx, ulong obj_id, raft_buf_ptr_t& data_out, + bool& is_last_obj) { + auto snp_ctx = std::make_shared< nuraft_snapshot_context >(s); + auto snp_data = std::make_shared< snapshot_data >(); + snp_data->user_ctx = user_ctx; + snp_data->offset = obj_id; + snp_data->is_last_obj = is_last_obj; + + // Listener will read the snapshot data and we pass through the same. + int ret = m_rd.m_listener->read_snapshot_data(snp_ctx, snp_data); + if (ret < 0) return ret; + + // Update user_ctx and whether is_last_obj + user_ctx = snp_data->user_ctx; + is_last_obj = snp_data->is_last_obj; + + // We are doing a copy here. + data_out = nuraft::buffer::alloc(snp_data->blob.size()); + nuraft::buffer_serializer bs(data_out); + bs.put_raw(snp_data->blob.cbytes(), snp_data->blob.size()); + return ret; +} + +void RaftStateMachine::save_logical_snp_obj(nuraft::snapshot& s, ulong& obj_id, nuraft::buffer& data, bool is_first_obj, + bool is_last_obj) { + auto snp_ctx = std::make_shared< nuraft_snapshot_context >(s); + auto snp_data = std::make_shared< snapshot_data >(); + snp_data->offset = obj_id; + snp_data->is_first_obj = is_first_obj; + snp_data->is_last_obj = is_last_obj; + + // We are doing a copy here. + sisl::io_blob_safe blob{s_cast< size_t >(data.size())}; + std::memcpy(blob.bytes(), data.data_begin(), data.size()); + snp_data->blob = std::move(blob); + + m_rd.m_listener->write_snapshot_data(snp_ctx, snp_data); + + // Update the object offset. + obj_id = snp_data->offset; +} + +bool RaftStateMachine::apply_snapshot(nuraft::snapshot& s) { + m_rd.set_last_commit_lsn(s.get_last_log_idx()); + m_rd.m_data_journal->set_last_durable_lsn(s.get_last_log_idx()); + auto snp_ctx = std::make_shared< nuraft_snapshot_context >(s); + return m_rd.m_listener->apply_snapshot(snp_ctx); +} + +nuraft::ptr< nuraft::snapshot > RaftStateMachine::last_snapshot() { + auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(m_rd.m_listener->last_snapshot()); + if (s == nullptr) return nullptr; + return s->nuraft_snapshot(); +} + +void RaftStateMachine::free_user_snp_ctx(void*& user_snp_ctx) { m_rd.m_listener->free_user_snp_ctx(user_snp_ctx); } + std::string RaftStateMachine::rdev_name() const { return m_rd.rdev_name(); } -nuraft::ptr< nuraft::snapshot > RaftStateMachine::last_snapshot() { return m_rd.get_last_snapshot(); } } // namespace homestore diff --git a/src/lib/replication/repl_dev/raft_state_machine.h b/src/lib/replication/repl_dev/raft_state_machine.h index db870555a..1325bdeab 100644 --- a/src/lib/replication/repl_dev/raft_state_machine.h +++ b/src/lib/replication/repl_dev/raft_state_machine.h @@ -111,10 +111,14 @@ class RaftStateMachine : public nuraft::state_machine { void rollback(uint64_t lsn, nuraft::buffer&) override { LOGCRITICAL("Unimplemented rollback on: [{}]", lsn); } void become_ready(); - bool apply_snapshot(nuraft::snapshot&) override { return false; } - void create_snapshot(nuraft::snapshot& s, nuraft::async_result< bool >::handler_type& when_done) override; + int read_logical_snp_obj(nuraft::snapshot& s, void*& user_ctx, ulong obj_id, raft_buf_ptr_t& data_out, + bool& is_last_obj) override; + void save_logical_snp_obj(nuraft::snapshot& s, ulong& obj_id, nuraft::buffer& data, bool is_first_obj, + bool is_last_obj) override; + bool apply_snapshot(nuraft::snapshot& s) override; nuraft::ptr< nuraft::snapshot > last_snapshot() override; + void free_user_snp_ctx(void*& user_snp_ctx) override; ////////// APIs outside of nuraft::state_machine requirements //////////////////// ReplServiceError propose_to_raft(repl_req_ptr_t rreq); diff --git a/src/tests/test_raft_repl_dev.cpp b/src/tests/test_raft_repl_dev.cpp index 3e3075190..2df66c70c 100644 --- a/src/tests/test_raft_repl_dev.cpp +++ b/src/tests/test_raft_repl_dev.cpp @@ -31,6 +31,7 @@ #include #include #include +#include #include #include #include "common/homestore_config.hpp" @@ -81,6 +82,11 @@ class TestReplicatedDB : public homestore::ReplDevListener { MultiBlkId blkid_; }; + struct KeyValuePair { + Key key; + Value value; + }; + struct test_req : public repl_req_ctx { struct journal_header { uint64_t data_size; @@ -130,6 +136,7 @@ class TestReplicatedDB : public homestore::ReplDevListener { { std::unique_lock lk(db_mtx_); inmem_db_.insert_or_assign(k, v); + last_committed_lsn = lsn; ++commit_count_; } @@ -154,7 +161,110 @@ class TestReplicatedDB : public homestore::ReplDevListener { *(r_cast< uint64_t const* >(key.cbytes()))); } - AsyncReplResult<> create_snapshot(repl_snapshot& s) override { return make_async_success<>(); } + AsyncReplResult<> create_snapshot(shared< snapshot_context > context) override { + auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(context)->nuraft_snapshot(); + LOGINFOMOD(replication, "[Replica={}] Got snapshot callback term={} idx={}", g_helper->replica_num(), + s->get_last_log_term(), s->get_last_log_idx()); + m_last_snapshot = context; + return make_async_success<>(); + } + + int read_snapshot_data(shared< snapshot_context > context, shared< snapshot_data > snp_data) override { + auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(context)->nuraft_snapshot(); + LOGINFOMOD(replication, "[Replica={}] Read logical snapshot callback obj_id={} term={} idx={}", + g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx()); + + if (snp_data->offset == 0) { + snp_data->is_last_obj = false; + snp_data->blob = sisl::io_blob_safe(sizeof(ulong)); + return 0; + } + int64_t follower_last_lsn = snp_data->offset; + std::vector< KeyValuePair > kv_snapshot_data; + LOGINFOMOD(replication, "[Replica={}] Read logical snapshot callback follower lsn={}", g_helper->replica_num(), + follower_last_lsn); + for (auto& [k, v] : inmem_db_) { + if (v.lsn_ > follower_last_lsn) { + kv_snapshot_data.emplace_back(k, v); + LOGINFOMOD(replication, "[Replica={}] Read logical snapshot callback fetching lsn={} {} {}", + g_helper->replica_num(), v.lsn_, v.data_size_, v.data_pattern_); + } + } + + int64_t kv_snapshot_data_size = sizeof(KeyValuePair) * kv_snapshot_data.size(); + LOGINFOMOD(replication, "Snapshot size {}", kv_snapshot_data_size); + + sisl::io_blob_safe blob{static_cast< uint32_t >(kv_snapshot_data_size)}; + std::memcpy(blob.bytes(), kv_snapshot_data.data(), kv_snapshot_data_size); + snp_data->blob = std::move(blob); + snp_data->is_last_obj = true; + return 0; + } + + void snapshot_data_write(uint64_t data_size, uint64_t data_pattern, MultiBlkId& out_blkids) { + auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); + auto write_sgs = test_common::HSTestHelper::create_sgs(data_size, block_size, data_pattern); + auto fut = homestore::data_service().async_alloc_write(write_sgs, blk_alloc_hints{}, out_blkids); + std::move(fut).get(); + for (auto const& iov : write_sgs.iovs) { + iomanager.iobuf_free(uintptr_cast(iov.iov_base)); + } + } + + void write_snapshot_data(shared< snapshot_context > context, shared< snapshot_data > snp_data) override { + auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(context)->nuraft_snapshot(); + LOGINFOMOD(replication, "[Replica={}] Save logical snapshot callback obj_id={} term={} idx={} is_last={}", + g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx(), + snp_data->is_last_obj); + + if (snp_data->offset == 0) { + // For obj_id 0 we sent back the last committed lsn. + snp_data->offset = last_committed_lsn; + LOGINFOMOD(replication, "[Replica={}] Save logical snapshot callback return obj_id={}", + g_helper->replica_num(), snp_data->offset); + return; + } + + std::unique_lock lk(db_mtx_); + size_t kv_snapshot_data_size = snp_data->blob.size(); + LOGINFOMOD(replication, "Snapshot size {}", kv_snapshot_data_size); + auto ptr = r_cast< const KeyValuePair* >(snp_data->blob.bytes()); + for (size_t i = 0; i < kv_snapshot_data_size / sizeof(KeyValuePair); i++) { + auto key = ptr->key; + auto value = ptr->value; + LOGINFOMOD(replication, "[Replica={}] Save logical snapshot got lsn={} data_size={} data_pattern={}", + g_helper->replica_num(), value.lsn_, value.data_size_, value.data_pattern_); + + // Write to data service and inmem map. + MultiBlkId out_blkids; + if (value.data_size_ != 0) { + snapshot_data_write(value.data_size_, value.data_pattern_, out_blkids); + value.blkid_ = out_blkids; + } + inmem_db_.insert_or_assign(key, value); + last_committed_lsn = value.lsn_; + ++commit_count_; + ptr++; + } + } + + bool apply_snapshot(shared< snapshot_context > context) override { + auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(context)->nuraft_snapshot(); + LOGINFOMOD(replication, "[Replica={}] Apply snapshot term={} idx={}", g_helper->replica_num(), + s->get_last_log_term(), s->get_last_log_idx()); + return true; + } + + shared< snapshot_context > last_snapshot() override { + if (!m_last_snapshot) return nullptr; + + auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(m_last_snapshot)->nuraft_snapshot(); + LOGINFOMOD(replication, "[Replica={}] Last snapshot term={} idx={}", g_helper->replica_num(), + s->get_last_log_term(), s->get_last_log_idx()); + return m_last_snapshot; + } + + void free_user_snp_ctx(void*& user_snp_ctx) override {} ReplResult< blk_alloc_hints > get_blk_alloc_hints(sisl::blob const& header, uint32_t data_size) override { return blk_alloc_hints{}; @@ -229,10 +339,31 @@ class TestReplicatedDB : public homestore::ReplDevListener { return inmem_db_.size(); } + void create_snapshot() { + auto raft_repl_dev = std::dynamic_pointer_cast< RaftReplDev >(repl_dev()); + ulong snapshot_idx = raft_repl_dev->raft_server()->create_snapshot(); + LOGINFO("Manually create snapshot got index {}", snapshot_idx); + } + + void truncate(int num_reserved_entries) { + auto raft_repl_dev = std::dynamic_pointer_cast< RaftReplDev >(repl_dev()); + raft_repl_dev->truncate(num_reserved_entries); + LOGINFO("Manually truncated"); + } + + void set_zombie() { zombie_ = true; } + bool is_zombie() { + // Wether a group is zombie(non recoverable) + return zombie_; + } + private: std::map< Key, Value > inmem_db_; uint64_t commit_count_{0}; std::shared_mutex db_mtx_; + uint64_t last_committed_lsn{0}; + std::shared_ptr< snapshot_context > m_last_snapshot{nullptr}; + bool zombie_{false}; }; class RaftReplDevTest : public testing::Test { @@ -248,6 +379,7 @@ class RaftReplDevTest : public testing::Test { void TearDown() override { for (auto const& db : dbs_) { + if (db->is_zombie()) { continue; } run_on_leader(db, [this, db]() { auto err = hs()->repl_service().remove_repl_dev(db->repl_dev()->group_id()).get(); ASSERT_EQ(err, ReplServiceError::OK) << "Error in destroying the group"; @@ -255,6 +387,7 @@ class RaftReplDevTest : public testing::Test { } for (auto const& db : dbs_) { + if (db->is_zombie()) { continue; } auto repl_dev = std::dynamic_pointer_cast< RaftReplDev >(db->repl_dev()); do { std::this_thread::sleep_for(std::chrono::seconds(1)); @@ -327,7 +460,7 @@ class RaftReplDevTest : public testing::Test { auto leader_uuid = db->repl_dev()->get_leader_id(); if (leader_uuid.is_nil()) { - LOGINFO("Waiting for leader to be elected"); + LOGINFO("Waiting for leader to be elected for group={}", db->repl_dev()->group_id()); std::this_thread::sleep_for(std::chrono::milliseconds{500}); } else if (leader_uuid == g_helper->my_replica_id()) { lambda(); @@ -405,6 +538,9 @@ class RaftReplDevTest : public testing::Test { } } + void create_snapshot() { dbs_[0]->create_snapshot(); } + void truncate(int num_reserved_entries) { dbs_[0]->truncate(num_reserved_entries); } + protected: std::vector< std::shared_ptr< TestReplicatedDB > > dbs_; uint32_t written_entries_{0}; @@ -644,15 +780,28 @@ TEST_F(RaftReplDevTest, RemoveReplDev) { std::this_thread::sleep_for(std::chrono::milliseconds(2)); this->remove_db(dbs_.back(), true /* wait_for_removal */); std::this_thread::sleep_for(std::chrono::seconds(2)); + LOGINFO("After remove db replica={} num_db={}", g_helper->replica_num(), dbs_.size()); // Step 3: Shutdown one of the follower and remove another repl_dev, once the follower is up, it should remove the // repl_dev and proceed LOGINFO("Shutdown one of the followers (replica=1) and then remove dbs on other members. Expect replica=1 to " "remove after it is up"); this->restart_replica(1, 15 /* shutdown_delay_sec */); - LOGINFO("After restart replica 1 {}", dbs_.size()); - this->remove_db(dbs_.back(), true /* wait_for_removal */); - LOGINFO("Remove last db {}", dbs_.size()); + LOGINFO("After restart replica={} num_db={}", g_helper->replica_num(), dbs_.size()); + std::this_thread::sleep_for(std::chrono::seconds(3)); + + // Since leader and follower 2 left the cluster, follower 1 is the only member in the raft group and need atleast + // 2 members to start leader election. In this case follower 1 can't be removed and goes to zombie state for this + // repl dev. + if (g_helper->replica_num() == 1) { + // Skip deleting this group during teardown. + LOGINFO("Set zombie on group={}", dbs_.back()->repl_dev()->group_id()); + dbs_.back()->set_zombie(); + } else { + this->remove_db(dbs_.back(), true /* wait_for_removal */); + LOGINFO("Remove last replica={} num_db={}", g_helper->replica_num(), dbs_.size()); + } + // TODO: Once generic crash flip/test_infra is available, use flip to crash during removal and restart them to see // if records are being removed g_helper->sync_for_cleanup_start(); @@ -704,6 +853,66 @@ TEST_F(RaftReplDevTest, GCReplReqs) { } #endif +TEST_F(RaftReplDevTest, BaselineTest) { + // Testing the baseline resync where leader creates snapshot and truncate entries. + // To simulate that write 10 entries to leader. Restart follower 1 with sleep 20s. + // Write to leader again to create 10 additional entries which follower 1 doesnt have. + // This is the baseline data. Truncate and snapshot on leader. Wait for commit for leader + // and follower 2. Write to leader again 10 entries after snapshot to create entries + // for incremental resync. We can create snapshot manually or triggered by raft. + // Verify all nodes got 20 entries. + LOGINFO("Homestore replica={} setup completed", g_helper->replica_num()); + g_helper->sync_for_test_start(); + +#ifdef _PRERELEASE + // If debug build we set flip to force truncate. + if (g_helper->replica_num() == 0) { + LOGINFO("Set force home logstore truncate"); + g_helper->set_basic_flip("force_home_raft_log_truncate"); + } +#endif + + // Write on leader. + uint64_t entries_per_attempt = 10; + LOGINFO("Write on leader num_entries={}", entries_per_attempt); + this->write_on_leader(entries_per_attempt, true /* wait_for_commit */); + + // Restart follower-1 with delay. + this->restart_replica(1, 20 /* shutdown_delay_sec */); + + LOGINFO("Write on leader num_entries={}", entries_per_attempt); + this->write_on_leader(entries_per_attempt, true /* wait_for_commit */); + + if (g_helper->replica_num() == 0 || g_helper->replica_num() == 2) { + // Wait for commmit on leader and follower-2 + this->wait_for_all_commits(); + LOGINFO("Got all commits for replica 0 and 2"); + } + + if (g_helper->replica_num() == 0) { + // Leader does manual snapshot and truncate + LOGINFO("Leader create snapshot and truncate"); + this->create_snapshot(); + this->truncate(0); + } + + // Write on leader to have some entries for increment resync. + LOGINFO("Write on leader num_entries={}", entries_per_attempt); + this->write_on_leader(entries_per_attempt, true /* wait_for_commit */); + if (g_helper->replica_num() == 0 || g_helper->replica_num() == 2) { + // Wait for commmit on leader and follower-2 + this->wait_for_all_commits(); + LOGINFO("Got all commits for replica 0 and 2 second time"); + } + + // Validate all have 30 log entries and corresponding entries. + g_helper->sync_for_verify_start(); + LOGINFO("Validate all data written so far by reading them"); + this->validate_data(); + g_helper->sync_for_cleanup_start(); + LOGINFO("BaselineTest done"); +} + int main(int argc, char* argv[]) { int parsed_argc = argc; char** orig_argv = argv; @@ -726,12 +935,17 @@ int main(int argc, char* argv[]) { // HS_SETTINGS_FACTORY().modifiable_settings([](auto& s) { s.consensus.leadership_expiry_ms = -1; // -1 means never expires; - s.generic.repl_dev_cleanup_interval_sec = 0; + s.generic.repl_dev_cleanup_interval_sec = 1; // Disable implicit flush and timer. s.logstore.flush_threshold_size = 0; s.logstore.flush_timer_frequency_us = 0; + // Snapshot and truncation tests needs num reserved to be 0 and distance 10. + s.consensus.num_reserved_log_items = 0; + s.consensus.snapshot_freq_distance = 10; + s.resource_limits.resource_audit_timer_ms = 0; + // only reset when user specified the value for test; if (SISL_OPTIONS.count("snapshot_distance")) { s.consensus.snapshot_freq_distance = SISL_OPTIONS["snapshot_distance"].as< uint32_t >(); diff --git a/src/tests/test_solo_repl_dev.cpp b/src/tests/test_solo_repl_dev.cpp index a401ba0d3..e166591ec 100644 --- a/src/tests/test_solo_repl_dev.cpp +++ b/src/tests/test_solo_repl_dev.cpp @@ -108,7 +108,15 @@ class SoloReplDevTest : public testing::Test { } } - AsyncReplResult<> create_snapshot(repl_snapshot& s) override { return make_async_success<>(); } + AsyncReplResult<> create_snapshot(shared< snapshot_context > context) override { + return make_async_success<>(); + } + int read_snapshot_data(shared< snapshot_context > context, shared< snapshot_data > snp_data) override { + return 0; + } + void write_snapshot_data(shared< snapshot_context > context, shared< snapshot_data > snp_data) override {} + bool apply_snapshot(shared< snapshot_context > context) override { return true; } + shared< snapshot_context > last_snapshot() override { return nullptr; } bool on_pre_commit(int64_t lsn, const sisl::blob& header, const sisl::blob& key, cintrusive< repl_req_ctx >& ctx) override {