diff --git a/.github/workflows/build_dependencies.yml b/.github/workflows/build_dependencies.yml index de11fcc28..a4f845158 100644 --- a/.github/workflows/build_dependencies.yml +++ b/.github/workflows/build_dependencies.yml @@ -165,6 +165,7 @@ jobs: - name: Build Cache run: | pre=$([[ "${{ inputs.build-type }}" != "Debug" ]] && echo "-o sisl:prerelease=${{ inputs.prerelease }}" || echo "") + sudo rm -rf $ANDROID_HOME conan install \ -c tools.build:skip_test=True \ ${pre} \ @@ -219,7 +220,6 @@ jobs: run: | sanitize=$([[ "${{ inputs.tooling }}" == "Sanitize" ]] && echo "True" || echo "False") pre=$([[ "${{ inputs.build-type }}" != "Debug" ]] && echo "-o sisl:prerelease=${{ inputs.prerelease }}" || echo "") - sudo rm -rf /usr/local/lib/android/ conan create \ ${pre} \ -o sisl:malloc_impl=${{ inputs.malloc-impl }} \ diff --git a/.jenkins/Dockerfile b/.jenkins/Dockerfile index 1306f9140..d481ff641 100644 --- a/.jenkins/Dockerfile +++ b/.jenkins/Dockerfile @@ -15,6 +15,7 @@ RUN set -eux; \ COPY test_index_btree /usr/local/bin/test_index_btree COPY test_meta_blk_mgr /usr/local/bin/test_meta_blk_mgr COPY test_log_store /usr/local/bin/test_log_store +COPY test_home_raft_logstore /usr/local/bin/test_home_raft_logstore COPY test_log_store_long_run /usr/local/bin/test_log_store_long_run COPY test_data_service /usr/local/bin/test_data_service COPY test_raft_repl_dev /usr/local/bin/test_raft_repl_dev diff --git a/.jenkins/jenkinsfile_nightly b/.jenkins/jenkinsfile_nightly index 1af16b91a..7ef305257 100644 --- a/.jenkins/jenkinsfile_nightly +++ b/.jenkins/jenkinsfile_nightly @@ -44,6 +44,7 @@ pipeline { sh "find ${CONAN_USER_HOME} -type f -wholename '*tests/test_index_btree' -exec cp {} .jenkins/test_index_btree \\;" sh "find ${CONAN_USER_HOME} -type f -wholename '*tests/test_meta_blk_mgr' -exec cp {} .jenkins/test_meta_blk_mgr \\;" sh "find ${CONAN_USER_HOME} -type f -wholename '*tests/test_log_store' -exec cp {} .jenkins/test_log_store \\;" + sh "find ${CONAN_USER_HOME} -type f -wholename '*tests/test_home_raft_logstore' -exec cp {} .jenkins/test_home_raft_logstore \\;" sh "find ${CONAN_USER_HOME} -type f -wholename '*tests/test_log_store_long_run' -exec cp {} .jenkins/test_log_store_long_run \\;" sh "find ${CONAN_USER_HOME} -type f -wholename '*tests/test_data_service' -exec cp {} .jenkins/test_data_service \\;" sh "find ${CONAN_USER_HOME} -type f -wholename '*tests/test_raft_repl_dev' -exec cp {} .jenkins/test_raft_repl_dev \\;" diff --git a/conanfile.py b/conanfile.py index 5029beeca..33e6614c2 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,8 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "6.4.53" + version = "6.4.55" + homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/include/homestore/logstore/log_store.hpp b/src/include/homestore/logstore/log_store.hpp index 71a537756..a2091f114 100644 --- a/src/include/homestore/logstore/log_store.hpp +++ b/src/include/homestore/logstore/log_store.hpp @@ -225,6 +225,7 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > { * @brief Rollback the given instance to the given sequence number * * @param to_lsn Sequence number back which logs are to be rollbacked + * the to_lsn will be the tail_lsn after rollback. * @return True on success */ bool rollback(logstore_seq_num_t to_lsn); diff --git a/src/include/homestore/replication/repl_dev.h b/src/include/homestore/replication/repl_dev.h index 5d1a0e8c2..9965ada5d 100644 --- a/src/include/homestore/replication/repl_dev.h +++ b/src/include/homestore/replication/repl_dev.h @@ -18,6 +18,7 @@ template < typename T > using ptr = std::shared_ptr< T >; class buffer; +class log_entry; } // namespace nuraft namespace homestore { @@ -25,6 +26,7 @@ class ReplDev; class ReplDevListener; struct repl_req_ctx; using raft_buf_ptr_t = nuraft::ptr< nuraft::buffer >; +using raft_cluster_config_ptr_t = nuraft::ptr< nuraft::cluster_config >; using repl_req_ptr_t = boost::intrusive_ptr< repl_req_ctx >; VENUM(repl_req_state_t, uint32_t, @@ -56,7 +58,9 @@ struct repl_key { }; bool operator==(repl_key const& other) const = default; - std::string to_string() const { return fmt::format("server={}, term={}, dsn={}", server_id, term, dsn); } + std::string to_string() const { + return fmt::format("server={}, term={}, dsn={}, hash={}", server_id, term, dsn, Hasher()(*this)); + } }; using repl_snapshot = nuraft::snapshot; @@ -149,7 +153,6 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost:: raft_buf_ptr_t& raft_journal_buf(); uint8_t* raw_journal_buf(); - /////////////////////// Non modifiers methods ////////////////// std::string to_string() const; std::string to_compact_string() const; @@ -203,6 +206,7 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost:: void set_lsn(int64_t lsn); void add_state(repl_req_state_t s); bool add_state_if_not_already(repl_req_state_t s); + void set_lentry(nuraft::ptr< nuraft::log_entry > const& lentry) { m_lentry = lentry; } void clear(); flatbuffers::FlatBufferBuilder& create_fb_builder() { return m_fb_builder; } void release_fb_builder() { m_fb_builder.Release(); } @@ -234,6 +238,7 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost:: std::variant< std::unique_ptr< uint8_t[] >, raft_buf_ptr_t > m_journal_buf; // Buf for the journal entry repl_journal_entry* m_journal_entry{nullptr}; // pointer to the journal entry bool m_is_jentry_localize_pending{false}; // Is the journal entry needs to be localized from remote + nuraft::ptr< nuraft::log_entry > m_lentry; /////////////// Replication state related section ///////////////// std::atomic< uint32_t > m_state{uint32_cast(repl_req_state_t::INIT)}; // State of the replication request diff --git a/src/lib/logstore/log_dev.cpp b/src/lib/logstore/log_dev.cpp index 507f02f19..e70c7b0f5 100644 --- a/src/lib/logstore/log_dev.cpp +++ b/src/lib/logstore/log_dev.cpp @@ -435,32 +435,39 @@ bool LogDev::flush() { return false; } - LogGroup* lg = prepare_flush(new_idx - m_last_flush_idx + 4); // Estimate 4 more extra in case of parallel writes - if (sisl_unlikely(!lg)) { - THIS_LOGDEV_LOG(TRACE, "Log idx {} last_flush_idx {} prepare flush failed", new_idx, m_last_flush_idx); - return false; - } - auto sz = m_pending_flush_size.fetch_sub(lg->actual_data_size(), std::memory_order_relaxed); - HS_REL_ASSERT_GE((sz - lg->actual_data_size()), 0, "size {} lg size {}", sz, lg->actual_data_size()); - off_t offset = m_vdev_jd->alloc_next_append_blk(lg->header()->total_size()); - lg->m_log_dev_offset = offset; - - HS_REL_ASSERT_NE(lg->m_log_dev_offset, INVALID_OFFSET, "log dev is full"); - THIS_LOGDEV_LOG(TRACE, "Flushing log group data size={} at offset={} log_group={}", lg->actual_data_size(), offset, - *lg); - - HISTOGRAM_OBSERVE(logstore_service().m_metrics, logdev_flush_records_distribution, lg->nrecords()); - HISTOGRAM_OBSERVE(logstore_service().m_metrics, logdev_flush_size_distribution, lg->actual_data_size()); + // the amount of logs which one logGroup can flush has a upper limit. here we want to make sure all the logs that + // need to be flushed will definitely be flushed to physical dev, so we need this loop to create multiple log groups + // if necessary + for (; m_last_flush_idx < new_idx;) { + LogGroup* lg = + prepare_flush(new_idx - m_last_flush_idx + 4); // Estimate 4 more extra in case of parallel writes + if (sisl_unlikely(!lg)) { + THIS_LOGDEV_LOG(TRACE, "Log idx {} last_flush_idx {} prepare flush failed", new_idx, m_last_flush_idx); + return false; + } + auto sz = m_pending_flush_size.fetch_sub(lg->actual_data_size(), std::memory_order_relaxed); + HS_REL_ASSERT_GE((sz - lg->actual_data_size()), 0, "size {} lg size {}", sz, lg->actual_data_size()); + off_t offset = m_vdev_jd->alloc_next_append_blk(lg->header()->total_size()); + lg->m_log_dev_offset = offset; + + HS_REL_ASSERT_NE(lg->m_log_dev_offset, INVALID_OFFSET, "log dev is full"); + THIS_LOGDEV_LOG(TRACE, "Flushing log group data size={} at offset={} log_group={}", lg->actual_data_size(), + offset, *lg); + + HISTOGRAM_OBSERVE(logstore_service().m_metrics, logdev_flush_records_distribution, lg->nrecords()); + HISTOGRAM_OBSERVE(logstore_service().m_metrics, logdev_flush_size_distribution, lg->actual_data_size()); + + // TODO:: add logic to handle this error in upper layer + auto error = m_vdev_jd->sync_pwritev(lg->iovecs().data(), int_cast(lg->iovecs().size()), lg->m_log_dev_offset); + if (error) { + THIS_LOGDEV_LOG(ERROR, "Fail to sync write to journal vde , error code {} : {}", error.value(), + error.message()); + return false; + } - // FIXME:: add logic to handle this error in upper layer - auto error = m_vdev_jd->sync_pwritev(lg->iovecs().data(), int_cast(lg->iovecs().size()), lg->m_log_dev_offset); - if (error) { - THIS_LOGDEV_LOG(ERROR, "Fail to sync write to journal vde , error code {} : {}", error.value(), - error.message()); - return false; + on_flush_completion(lg); } - on_flush_completion(lg); return true; } diff --git a/src/lib/logstore/log_store.cpp b/src/lib/logstore/log_store.cpp index 7c2fbe82f..1f2a4434b 100644 --- a/src/lib/logstore/log_store.cpp +++ b/src/lib/logstore/log_store.cpp @@ -177,6 +177,7 @@ void HomeLogStore::on_log_found(logstore_seq_num_t seq_num, const logdev_key& ld void HomeLogStore::truncate(logstore_seq_num_t upto_lsn, bool in_memory_truncate_only) { if (upto_lsn < m_start_lsn) { return; } + flush(); #ifndef NDEBUG auto cs = get_contiguous_completed_seq_num(0); if (upto_lsn > cs) { @@ -283,13 +284,17 @@ void HomeLogStore::flush(logstore_seq_num_t upto_lsn) { } bool HomeLogStore::rollback(logstore_seq_num_t to_lsn) { - // Validate if the lsn to which it is rolledback to is not truncated. - auto ret = m_records.status(to_lsn + 1); - if (ret.is_out_of_range) { - HS_LOG_ASSERT(false, "Attempted to rollback to {} which is already truncated", to_lsn); + //Fast path + if (to_lsn == m_tail_lsn.load()) { + return true; + } + + if (to_lsn > m_tail_lsn.load() || to_lsn < m_start_lsn.load()) { + HS_LOG_ASSERT(false, "Attempted to rollback to {} which is not in the range of [{}, {}]", to_lsn, m_start_lsn.load(), m_tail_lsn.load()); return false; } + THIS_LOGSTORE_LOG(INFO, "Rolling back to {}, tail {}", to_lsn, m_tail_lsn.load()); bool do_flush{false}; do { { diff --git a/src/lib/replication/log_store/home_raft_log_store.cpp b/src/lib/replication/log_store/home_raft_log_store.cpp index c57267f91..ce88571e4 100644 --- a/src/lib/replication/log_store/home_raft_log_store.cpp +++ b/src/lib/replication/log_store/home_raft_log_store.cpp @@ -264,8 +264,8 @@ raft_buf_ptr_t HomeRaftLogStore::pack(ulong index, int32_t cnt) { [this, &out_buf, &remain_cnt]([[maybe_unused]] store_lsn_t cur, const log_buffer& entry) mutable -> bool { if (remain_cnt-- > 0) { size_t avail_size = out_buf->size() - out_buf->pos(); - if (avail_size < entry.size()) { - avail_size += std::max(out_buf->size() * 2, (size_t)entry.size()); + if (avail_size < entry.size() + sizeof(uint32_t)) { + avail_size += std::max(out_buf->size() * 2, (size_t)entry.size() + sizeof(uint32_t)); out_buf = nuraft::buffer::expand(*out_buf, avail_size); } REPL_STORE_LOG(TRACE, "packing lsn={} of size={}, avail_size in buffer={}", to_repl_lsn(cur), @@ -322,20 +322,7 @@ bool HomeRaftLogStore::compact(ulong compact_lsn) { append(m_dummy_log_entry); } } - - m_log_store->flush(to_store_lsn(compact_lsn)); - - // this will only truncate in memory, and not on disk; - // we rely on resrouce mgr timer to trigger real truncate for all log stores in system; - // this will be friendly for multiple logstore on same logdev; -#ifdef _PRERELEASE - if (iomgr_flip::instance()->test_flip("force_home_raft_log_truncate")) { - REPL_STORE_LOG(TRACE, "Flip force_home_raft_log_truncate is enabled, force truncation, compact_lsn={}", - compact_lsn); - m_log_store->truncate(to_store_lsn(compact_lsn)); - } -#endif - + m_log_store->truncate(to_store_lsn(compact_lsn)); return true; } diff --git a/src/lib/replication/log_store/home_raft_log_store.h b/src/lib/replication/log_store/home_raft_log_store.h index da4137e8f..ccf46ef92 100644 --- a/src/lib/replication/log_store/home_raft_log_store.h +++ b/src/lib/replication/log_store/home_raft_log_store.h @@ -30,6 +30,7 @@ namespace homestore { using raft_buf_ptr_t = nuraft::ptr< nuraft::buffer >; +using raft_cluster_config_ptr_t = nuraft::ptr< nuraft::cluster_config >; class HomeRaftLogStore : public nuraft::log_store { public: diff --git a/src/lib/replication/repl_dev/common.cpp b/src/lib/replication/repl_dev/common.cpp index 47947cd7c..71927a3ad 100644 --- a/src/lib/replication/repl_dev/common.cpp +++ b/src/lib/replication/repl_dev/common.cpp @@ -106,7 +106,8 @@ uint8_t* repl_req_ctx::raw_journal_buf() { return std::get< std::unique_ptr< uin void repl_req_ctx::set_lsn(int64_t lsn) { DEBUG_ASSERT((m_lsn == -1) || (m_lsn == lsn), - "Changing lsn for request={} on the fly can cause race condition, not expected", to_string()); + "Changing lsn for request={} on the fly can cause race condition, not expected. lsn {}, m_lsn {}", + to_string(), lsn, m_lsn); m_lsn = lsn; LOGTRACEMOD(replication, "Setting lsn={} for request={}", lsn, to_string()); } diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index bad06c16b..530cddff9 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -77,11 +77,12 @@ RaftReplDev::RaftReplDev(RaftReplService& svc, superblk< raft_repl_dev_superblk } RD_LOG(INFO, - "Started {} RaftReplDev group_id={}, replica_id={}, raft_server_id={} commited_lsn={}, compact_lsn={} " - "next_dsn={} " + "Started {} RaftReplDev group_id={}, replica_id={}, raft_server_id={} commited_lsn={}, " + "compact_lsn={}, checkpoint_lsn:{}, next_dsn={} " "log_dev={} log_store={}", (load_existing ? "Existing" : "New"), group_id_str(), my_replica_id_str(), m_raft_server_id, - m_commit_upto_lsn.load(), m_compact_lsn.load(), m_next_dsn.load(), m_rd_sb->logdev_id, m_rd_sb->logstore_id); + m_commit_upto_lsn.load(), m_compact_lsn.load(), m_rd_sb->checkpoint_lsn, m_next_dsn.load(), + m_rd_sb->logdev_id, m_rd_sb->logstore_id); #ifdef _PRERELEASE m_msg_mgr.bind_data_service_request(PUSH_DATA, m_group_id, [this](intrusive< sisl::GenericRpcData >& rpc_data) { @@ -746,7 +747,7 @@ void RaftReplDev::handle_fetch_data_response(sisl::GenericClientResponse respons RD_DBG_ASSERT_EQ(total_size, 0, "Total size mismatch, some data is not consumed"); } -void RaftReplDev::handle_commit(repl_req_ptr_t rreq, bool recovery) { +void RaftReplDev::commit_blk(repl_req_ptr_t rreq) { if (rreq->local_blkid().is_valid()) { if (data_service().commit_blk(rreq->local_blkid()) != BlkAllocStatus::SUCCESS) { if (hs()->device_mgr()->is_boot_in_degraded_mode() && m_log_store_replay_done) @@ -755,6 +756,10 @@ void RaftReplDev::handle_commit(repl_req_ptr_t rreq, bool recovery) { RD_DBG_ASSERT(false, "fail to commit blk when applying log in non-degraded mode.") } } +} + +void RaftReplDev::handle_commit(repl_req_ptr_t rreq, bool recovery) { + commit_blk(rreq); // Remove the request from repl_key map. m_repl_key_req_map.erase(rreq->rkey()); @@ -1045,7 +1050,13 @@ std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nu if (entry->get_val_type() != nuraft::log_val_type::app_log) { continue; } if (entry->get_buf_ptr()->size() == 0) { continue; } auto req = m_state_machine->localize_journal_entry_prepare(*entry); - if (req == nullptr) { + // TODO :: we need to indentify whether this log entry should be appended to log store. + // 1 for lsn, if the req#lsn is not -1, it means this log has been localized and apeneded before, we + // should skip it. + // 2 for dsn, if the req#dsn is less than the next_dsn, it means this log has been + // committed, we should skip it. + // here, we only check the first condition for now. revisit here if we need to check the second + if (req == nullptr || req->lsn() != -1) { sisl::VectorPool< repl_req_ptr_t >::free(reqs); return {true, nuraft::cb_func::ReturnCode::ReturnNull}; } @@ -1181,10 +1192,14 @@ void RaftReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx } rreq->set_lsn(repl_lsn); + // keep lentry in scope for the lyfe cycle of the rreq + rreq->set_lentry(lentry); rreq->init(rkey, jentry->code, false /* is_proposer */, entry_to_hdr(jentry), entry_to_key(jentry), data_size); RD_LOGD("Replay log on restart, rreq=[{}]", rreq->to_string()); if (repl_lsn > m_rd_sb->durable_commit_lsn) { + // In memory state of these blks is lost. Commit them now to avoid usage of same blk twice. + commit_blk(rreq); m_state_machine->link_lsn_to_req(rreq, int64_cast(repl_lsn)); return; } @@ -1202,7 +1217,12 @@ bool RaftReplDev::is_resync_mode() { int64_t const leader_commited_lsn = raft_server()->get_leader_committed_log_idx(); int64_t const my_log_idx = raft_server()->get_last_log_idx(); auto diff = leader_commited_lsn - my_log_idx; - return diff > HS_DYNAMIC_CONFIG(consensus.resync_log_idx_threshold); + bool resync_mode = (diff > HS_DYNAMIC_CONFIG(consensus.resync_log_idx_threshold)); + if (resync_mode) { + RD_LOGD("Raft Channel: Resync mode, leader_commited_lsn={}, my_log_idx={}, diff={}", leader_commited_lsn, + my_log_idx, diff); + } + return resync_mode; } } // namespace homestore diff --git a/src/lib/replication/repl_dev/raft_repl_dev.h b/src/lib/replication/repl_dev/raft_repl_dev.h index a68e3f578..41594b528 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.h +++ b/src/lib/replication/repl_dev/raft_repl_dev.h @@ -31,6 +31,7 @@ struct raft_repl_dev_superblk : public repl_dev_superblk { #pragma pack() using raft_buf_ptr_t = nuraft::ptr< nuraft::buffer >; +using raft_cluster_config_ptr_t = nuraft::ptr< nuraft::cluster_config >; ENUM(repl_dev_stage_t, uint8_t, INIT, ACTIVE, DESTROYING, DESTROYED, PERMANENT_DESTROYED); @@ -229,6 +230,14 @@ class RaftReplDev : public ReplDev, */ void on_restart(); + /** + * \brief This method is called to force leave the group without waiting for committing the destroy message. + * it is used when the repl_dev is a stale member of a destroyed group. this stable member does not receive the + * destroy message. but the group is already destroyed, so no leader will send this message again to this stale + * member. we need to force leave the group to avoid the stale member to be a part of the group. + */ + void force_leave() { leave(); } + protected: //////////////// All nuraft::state_mgr overrides /////////////////////// nuraft::ptr< nuraft::cluster_config > load_config() override; @@ -258,6 +267,7 @@ class RaftReplDev : public ReplDev, void handle_error(repl_req_ptr_t const& rreq, ReplServiceError err); bool wait_for_data_receive(std::vector< repl_req_ptr_t > const& rreqs, uint64_t timeout_ms); void on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx); + void commit_blk(repl_req_ptr_t rreq); }; } // namespace homestore diff --git a/src/lib/replication/repl_dev/raft_state_machine.cpp b/src/lib/replication/repl_dev/raft_state_machine.cpp index 12a9be312..ba30095ca 100644 --- a/src/lib/replication/repl_dev/raft_state_machine.cpp +++ b/src/lib/replication/repl_dev/raft_state_machine.cpp @@ -201,6 +201,11 @@ raft_buf_ptr_t RaftStateMachine::commit_ext(nuraft::state_machine::ext_op_params return m_success_ptr; } +void RaftStateMachine::commit_config(const ulong log_idx, raft_cluster_config_ptr_t& new_conf) { + RD_LOGD("Raft channel: Commit new cluster conf , log_idx = {}", log_idx); + // TODO:add more logic here if necessary +} + void RaftStateMachine::iterate_repl_reqs(std::function< void(int64_t, repl_req_ptr_t rreq) > const& cb) { for (auto [key, rreq] : m_lsn_req_map) { cb(key, rreq); diff --git a/src/lib/replication/repl_dev/raft_state_machine.h b/src/lib/replication/repl_dev/raft_state_machine.h index 1325bdeab..b931e42f4 100644 --- a/src/lib/replication/repl_dev/raft_state_machine.h +++ b/src/lib/replication/repl_dev/raft_state_machine.h @@ -108,6 +108,7 @@ class RaftStateMachine : public nuraft::state_machine { uint64_t last_commit_index() override; raft_buf_ptr_t pre_commit_ext(const nuraft::state_machine::ext_op_params& params) override; raft_buf_ptr_t commit_ext(const nuraft::state_machine::ext_op_params& params) override; + void commit_config(const ulong log_idx, raft_cluster_config_ptr_t& new_conf) override; void rollback(uint64_t lsn, nuraft::buffer&) override { LOGCRITICAL("Unimplemented rollback on: [{}]", lsn); } void become_ready(); diff --git a/src/lib/replication/service/raft_repl_service.cpp b/src/lib/replication/service/raft_repl_service.cpp index 77b51ebe6..12120116a 100644 --- a/src/lib/replication/service/raft_repl_service.cpp +++ b/src/lib/replication/service/raft_repl_service.cpp @@ -338,7 +338,8 @@ void RaftReplService::start_reaper_thread() { m_reaper_fiber = iomanager.iofiber_self(); // Schedule the rdev garbage collector timer - LOGINFOMOD(replication, "Reaper Thread: scheduling GC every {} seconds", HS_DYNAMIC_CONFIG(generic.repl_dev_cleanup_interval_sec)); + LOGINFOMOD(replication, "Reaper Thread: scheduling GC every {} seconds", + HS_DYNAMIC_CONFIG(generic.repl_dev_cleanup_interval_sec)); m_rdev_gc_timer_hdl = iomanager.schedule_thread_timer( HS_DYNAMIC_CONFIG(generic.repl_dev_cleanup_interval_sec) * 1000 * 1000 * 1000, true /* recurring */, nullptr, [this](void*) { diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt index 26b75752e..ddbac4c94 100644 --- a/src/tests/CMakeLists.txt +++ b/src/tests/CMakeLists.txt @@ -122,12 +122,11 @@ if (${io_tests}) if(${epoll_tests}) add_test(NAME LogDev-Epoll COMMAND test_log_dev) add_test(NAME LogStore-Epoll COMMAND test_log_store) + add_test(NAME HomeRaftLogStore-Epoll COMMAND test_home_raft_logstore) add_test(NAME MetaBlkMgr-Epoll COMMAND test_meta_blk_mgr) add_test(NAME DataService-Epoll COMMAND test_data_service) - - # add_test(NAME SoloReplDev-Epoll COMMAND test_solo_repl_dev) - # add_test(NAME HomeRaftLogStore-Epoll COMMAND test_home_raft_logstore) add_test(NAME RaftReplDev-Epoll COMMAND test_raft_repl_dev) + # add_test(NAME SoloReplDev-Epoll COMMAND test_solo_repl_dev) endif() can_build_spdk_io_tests(spdk_tests) diff --git a/src/tests/test_common/homestore_test_common.hpp b/src/tests/test_common/homestore_test_common.hpp index 2de777428..5ba0824ac 100644 --- a/src/tests/test_common/homestore_test_common.hpp +++ b/src/tests/test_common/homestore_test_common.hpp @@ -46,7 +46,7 @@ SISL_OPTION_GROUP( "number"), (num_devs, "", "num_devs", "number of devices to create", ::cxxopts::value< uint32_t >()->default_value("3"), "number"), - (dev_size_mb, "", "dev_size_mb", "size of each device in MB", ::cxxopts::value< uint64_t >()->default_value("1024"), + (dev_size_mb, "", "dev_size_mb", "size of each device in MB", ::cxxopts::value< uint64_t >()->default_value("2048"), "number"), (device_list, "", "device_list", "Device List instead of default created", ::cxxopts::value< std::vector< std::string > >(), "path [...]"), diff --git a/src/tests/test_log_dev.cpp b/src/tests/test_log_dev.cpp index 52208ce89..da0a7e458 100644 --- a/src/tests/test_log_dev.cpp +++ b/src/tests/test_log_dev.cpp @@ -262,6 +262,9 @@ TEST_F(LogDevTest, Rollback) { logstore_seq_num_t cur_lsn = 0; kickstart_inserts(log_store, cur_lsn, 500); + LOGINFO("Step 3.0: Rollback last 0 entries and validate if pre-rollback entries are intact"); + rollback_validate(log_store, cur_lsn, 0); // Last entry = 500 + LOGINFO("Step 3: Rollback last 50 entries and validate if pre-rollback entries are intact"); rollback_validate(log_store, cur_lsn, 50); // Last entry = 450 diff --git a/src/tests/test_log_store_long_run.cpp b/src/tests/test_log_store_long_run.cpp index 53ecf317e..5fd0ec21f 100644 --- a/src/tests/test_log_store_long_run.cpp +++ b/src/tests/test_log_store_long_run.cpp @@ -166,11 +166,6 @@ class SampleLogStoreClient { } void rollback_validate(uint32_t num_lsns_to_rollback) { - - if (m_log_store->truncated_upto() == m_log_store->get_contiguous_completed_seq_num(-1)) { - // No records to rollback. - return; - } if ((m_cur_lsn - num_lsns_to_rollback - 1) <= m_log_store->get_contiguous_issued_seq_num(-1)) { return; } auto const upto_lsn = m_cur_lsn.fetch_sub(num_lsns_to_rollback) - num_lsns_to_rollback - 1; m_log_store->rollback(upto_lsn); diff --git a/src/tests/test_raft_repl_dev.cpp b/src/tests/test_raft_repl_dev.cpp index 675cb517c..cc55187db 100644 --- a/src/tests/test_raft_repl_dev.cpp +++ b/src/tests/test_raft_repl_dev.cpp @@ -141,7 +141,6 @@ class TestReplicatedDB : public homestore::ReplDevListener { std::unique_lock lk(db_mtx_); inmem_db_.insert_or_assign(k, v); lsn_index_.emplace(lsn, v); - last_committed_lsn = lsn; ++commit_count_; } @@ -193,7 +192,10 @@ class TestReplicatedDB : public homestore::ReplDevListener { int64_t next_lsn = snp_data->offset; std::vector< KeyValuePair > kv_snapshot_data; - for (auto iter = lsn_index_.find(next_lsn); iter != lsn_index_.end(); iter++) { + // we can not use find to get the next element, since if the next lsn is a config lsn , it will not be put into + // lsn_index_ and as a result, the find will return the end of the map. so here we use lower_bound to get the + // first element to be read and transfered. + for (auto iter = lsn_index_.lower_bound(next_lsn); iter != lsn_index_.end(); iter++) { auto& v = iter->second; kv_snapshot_data.emplace_back(Key{v.id_}, v); LOGTRACEMOD(replication, "[Replica={}] Read logical snapshot callback fetching lsn={} size={} pattern={}", @@ -211,7 +213,7 @@ class TestReplicatedDB : public homestore::ReplDevListener { sisl::io_blob_safe blob{static_cast< uint32_t >(kv_snapshot_data_size)}; std::memcpy(blob.bytes(), kv_snapshot_data.data(), kv_snapshot_data_size); snp_data->blob = std::move(blob); - snp_data->is_last_obj = false; + snp_data->is_last_obj = true; LOGINFOMOD(replication, "[Replica={}] Read logical snapshot callback obj_id={} term={} idx={} num_items={}", g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx(), kv_snapshot_data.size()); @@ -231,18 +233,17 @@ class TestReplicatedDB : public homestore::ReplDevListener { void write_snapshot_data(shared< snapshot_context > context, shared< snapshot_data > snp_data) override { auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(context)->nuraft_snapshot(); - + auto last_committed_idx = + std::dynamic_pointer_cast< RaftReplDev >(repl_dev())->raft_server()->get_committed_log_idx(); if (snp_data->offset == 0) { - // For obj_id 0 we sent back the last committed lsn. - snp_data->offset = last_committed_lsn; - LOGINFOMOD(replication, "[Replica={}] Save logical snapshot callback obj_id={} term={} idx={} is_last={}", - g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx(), - snp_data->is_last_obj); + snp_data->offset = last_committed_idx + 1; + LOGINFOMOD(replication, "[Replica={}] Save logical snapshot callback return obj_id={}", + g_helper->replica_num(), snp_data->offset); return; } size_t kv_snapshot_data_size = snp_data->blob.size(); - if (kv_snapshot_data_size == 0) { return; } + if (kv_snapshot_data_size == 0) return; size_t num_items = kv_snapshot_data_size / sizeof(KeyValuePair); std::unique_lock lk(db_mtx_); @@ -260,7 +261,6 @@ class TestReplicatedDB : public homestore::ReplDevListener { value.blkid_ = out_blkids; } inmem_db_.insert_or_assign(key, value); - last_committed_lsn = value.lsn_; ++commit_count_; ptr++; } @@ -269,7 +269,7 @@ class TestReplicatedDB : public homestore::ReplDevListener { "[Replica={}] Save logical snapshot callback obj_id={} term={} idx={} is_last={} num_items={}", g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx(), snp_data->is_last_obj, num_items); - snp_data->offset = last_committed_lsn + 1; + snp_data->offset = last_committed_idx + 1; } bool apply_snapshot(shared< snapshot_context > context) override { @@ -392,7 +392,6 @@ class TestReplicatedDB : public homestore::ReplDevListener { std::map< int64_t, Value > lsn_index_; uint64_t commit_count_{0}; std::shared_mutex db_mtx_; - uint64_t last_committed_lsn{0}; std::shared_ptr< snapshot_context > m_last_snapshot{nullptr}; std::mutex m_snapshot_lock; bool zombie_{false}; @@ -421,11 +420,23 @@ class RaftReplDevTest : public testing::Test { for (auto const& db : dbs_) { if (db->is_zombie()) { continue; } auto repl_dev = std::dynamic_pointer_cast< RaftReplDev >(db->repl_dev()); + int i = 0; + bool force_leave = false; do { std::this_thread::sleep_for(std::chrono::seconds(1)); auto& raft_repl_svc = dynamic_cast< RaftReplService& >(hs()->repl_service()); raft_repl_svc.gc_repl_devs(); LOGINFO("Waiting for repl dev to get destroyed"); + + // TODO: if leader is destroyed, but the follower does not receive the notification, it will not be + // destroyed for ever. we need handle this in raft_repl_dev. revisit here after making changes at + // raft_repl_dev side to hanle this case. this is a workaround to avoid the infinite loop for now. + if (i++ > 10 && !force_leave) { + LOGWARN("Waiting for repl dev to get destroyed and it is leader, so do a force leave"); + repl_dev->force_leave(); + force_leave = true; + } + } while (!repl_dev->is_destroyed()); } } @@ -919,16 +930,9 @@ TEST_F(RaftReplDevTest, BaselineTest) { LOGINFO("Homestore replica={} setup completed", g_helper->replica_num()); g_helper->sync_for_test_start(); -#ifdef _PRERELEASE - // If debug build we set flip to force truncate. - if (g_helper->replica_num() == 0) { - LOGINFO("Set force home logstore truncate"); - g_helper->set_basic_flip("force_home_raft_log_truncate"); - } -#endif - // Write some entries on leader. uint64_t entries_per_attempt = 50; + LOGINFO("Write on leader num_entries={}", entries_per_attempt); this->write_on_leader(entries_per_attempt, true /* wait_for_commit */);