diff --git a/conanfile.py b/conanfile.py index 870760692..5029beeca 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "6.4.52" + version = "6.4.53" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/include/homestore/replication/repl_dev.h b/src/include/homestore/replication/repl_dev.h index 5d1a0e8c2..2fd963df3 100644 --- a/src/include/homestore/replication/repl_dev.h +++ b/src/include/homestore/replication/repl_dev.h @@ -25,6 +25,7 @@ class ReplDev; class ReplDevListener; struct repl_req_ctx; using raft_buf_ptr_t = nuraft::ptr< nuraft::buffer >; +using raft_cluster_config_ptr_t = nuraft::ptr< nuraft::cluster_config >; using repl_req_ptr_t = boost::intrusive_ptr< repl_req_ctx >; VENUM(repl_req_state_t, uint32_t, @@ -56,7 +57,9 @@ struct repl_key { }; bool operator==(repl_key const& other) const = default; - std::string to_string() const { return fmt::format("server={}, term={}, dsn={}", server_id, term, dsn); } + std::string to_string() const { + return fmt::format("server={}, term={}, dsn={}, hash={}", server_id, term, dsn, Hasher()(*this)); + } }; using repl_snapshot = nuraft::snapshot; diff --git a/src/lib/replication/log_store/home_raft_log_store.h b/src/lib/replication/log_store/home_raft_log_store.h index da4137e8f..ccf46ef92 100644 --- a/src/lib/replication/log_store/home_raft_log_store.h +++ b/src/lib/replication/log_store/home_raft_log_store.h @@ -30,6 +30,7 @@ namespace homestore { using raft_buf_ptr_t = nuraft::ptr< nuraft::buffer >; +using raft_cluster_config_ptr_t = nuraft::ptr< nuraft::cluster_config >; class HomeRaftLogStore : public nuraft::log_store { public: diff --git a/src/lib/replication/repl_dev/common.cpp b/src/lib/replication/repl_dev/common.cpp index 47947cd7c..71927a3ad 100644 --- a/src/lib/replication/repl_dev/common.cpp +++ b/src/lib/replication/repl_dev/common.cpp @@ -106,7 +106,8 @@ uint8_t* repl_req_ctx::raw_journal_buf() { return std::get< std::unique_ptr< uin void repl_req_ctx::set_lsn(int64_t lsn) { DEBUG_ASSERT((m_lsn == -1) || (m_lsn == lsn), - "Changing lsn for request={} on the fly can cause race condition, not expected", to_string()); + "Changing lsn for request={} on the fly can cause race condition, not expected. lsn {}, m_lsn {}", + to_string(), lsn, m_lsn); m_lsn = lsn; LOGTRACEMOD(replication, "Setting lsn={} for request={}", lsn, to_string()); } diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index bad06c16b..77c52d65b 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -1045,7 +1045,13 @@ std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nu if (entry->get_val_type() != nuraft::log_val_type::app_log) { continue; } if (entry->get_buf_ptr()->size() == 0) { continue; } auto req = m_state_machine->localize_journal_entry_prepare(*entry); - if (req == nullptr) { + // TODO :: we need to indentify whether this log entry should be appended to log store. + // 1 for lsn, if the req#lsn is not -1, it means this log has been localized and apeneded before, we + // should skip it. + // 2 for dsn, if the req#dsn is less than the next_dsn, it means this log has been + // committed, we should skip it. + // here, we only check the first condition for now. revisit here if we need to check the second + if (req == nullptr || req->lsn() != -1) { sisl::VectorPool< repl_req_ptr_t >::free(reqs); return {true, nuraft::cb_func::ReturnCode::ReturnNull}; } diff --git a/src/lib/replication/repl_dev/raft_repl_dev.h b/src/lib/replication/repl_dev/raft_repl_dev.h index a68e3f578..65c93fad5 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.h +++ b/src/lib/replication/repl_dev/raft_repl_dev.h @@ -31,6 +31,7 @@ struct raft_repl_dev_superblk : public repl_dev_superblk { #pragma pack() using raft_buf_ptr_t = nuraft::ptr< nuraft::buffer >; +using raft_cluster_config_ptr_t = nuraft::ptr< nuraft::cluster_config >; ENUM(repl_dev_stage_t, uint8_t, INIT, ACTIVE, DESTROYING, DESTROYED, PERMANENT_DESTROYED); @@ -229,6 +230,14 @@ class RaftReplDev : public ReplDev, */ void on_restart(); + /** + * \brief This method is called to force leave the group without waiting for committing the destroy message. + * it is used when the repl_dev is a stale member of a destroyed group. this stable member does not receive the + * destroy message. but the group is already destroyed, so no leader will send this message again to this stale + * member. we need to force leave the group to avoid the stale member to be a part of the group. + */ + void force_leave() { leave(); } + protected: //////////////// All nuraft::state_mgr overrides /////////////////////// nuraft::ptr< nuraft::cluster_config > load_config() override; diff --git a/src/lib/replication/repl_dev/raft_state_machine.cpp b/src/lib/replication/repl_dev/raft_state_machine.cpp index 12a9be312..ba30095ca 100644 --- a/src/lib/replication/repl_dev/raft_state_machine.cpp +++ b/src/lib/replication/repl_dev/raft_state_machine.cpp @@ -201,6 +201,11 @@ raft_buf_ptr_t RaftStateMachine::commit_ext(nuraft::state_machine::ext_op_params return m_success_ptr; } +void RaftStateMachine::commit_config(const ulong log_idx, raft_cluster_config_ptr_t& new_conf) { + RD_LOGD("Raft channel: Commit new cluster conf , log_idx = {}", log_idx); + // TODO:add more logic here if necessary +} + void RaftStateMachine::iterate_repl_reqs(std::function< void(int64_t, repl_req_ptr_t rreq) > const& cb) { for (auto [key, rreq] : m_lsn_req_map) { cb(key, rreq); diff --git a/src/lib/replication/repl_dev/raft_state_machine.h b/src/lib/replication/repl_dev/raft_state_machine.h index 1325bdeab..b931e42f4 100644 --- a/src/lib/replication/repl_dev/raft_state_machine.h +++ b/src/lib/replication/repl_dev/raft_state_machine.h @@ -108,6 +108,7 @@ class RaftStateMachine : public nuraft::state_machine { uint64_t last_commit_index() override; raft_buf_ptr_t pre_commit_ext(const nuraft::state_machine::ext_op_params& params) override; raft_buf_ptr_t commit_ext(const nuraft::state_machine::ext_op_params& params) override; + void commit_config(const ulong log_idx, raft_cluster_config_ptr_t& new_conf) override; void rollback(uint64_t lsn, nuraft::buffer&) override { LOGCRITICAL("Unimplemented rollback on: [{}]", lsn); } void become_ready(); diff --git a/src/lib/replication/service/raft_repl_service.cpp b/src/lib/replication/service/raft_repl_service.cpp index 77b51ebe6..12120116a 100644 --- a/src/lib/replication/service/raft_repl_service.cpp +++ b/src/lib/replication/service/raft_repl_service.cpp @@ -338,7 +338,8 @@ void RaftReplService::start_reaper_thread() { m_reaper_fiber = iomanager.iofiber_self(); // Schedule the rdev garbage collector timer - LOGINFOMOD(replication, "Reaper Thread: scheduling GC every {} seconds", HS_DYNAMIC_CONFIG(generic.repl_dev_cleanup_interval_sec)); + LOGINFOMOD(replication, "Reaper Thread: scheduling GC every {} seconds", + HS_DYNAMIC_CONFIG(generic.repl_dev_cleanup_interval_sec)); m_rdev_gc_timer_hdl = iomanager.schedule_thread_timer( HS_DYNAMIC_CONFIG(generic.repl_dev_cleanup_interval_sec) * 1000 * 1000 * 1000, true /* recurring */, nullptr, [this](void*) { diff --git a/src/tests/test_raft_repl_dev.cpp b/src/tests/test_raft_repl_dev.cpp index 675cb517c..0a2ab10b6 100644 --- a/src/tests/test_raft_repl_dev.cpp +++ b/src/tests/test_raft_repl_dev.cpp @@ -141,7 +141,6 @@ class TestReplicatedDB : public homestore::ReplDevListener { std::unique_lock lk(db_mtx_); inmem_db_.insert_or_assign(k, v); lsn_index_.emplace(lsn, v); - last_committed_lsn = lsn; ++commit_count_; } @@ -233,11 +232,10 @@ class TestReplicatedDB : public homestore::ReplDevListener { auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(context)->nuraft_snapshot(); if (snp_data->offset == 0) { - // For obj_id 0 we sent back the last committed lsn. - snp_data->offset = last_committed_lsn; - LOGINFOMOD(replication, "[Replica={}] Save logical snapshot callback obj_id={} term={} idx={} is_last={}", - g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx(), - snp_data->is_last_obj); + auto raft_server = std::dynamic_pointer_cast< RaftReplDev >(repl_dev())->raft_server(); + snp_data->offset = raft_server->get_committed_log_idx(); + LOGINFOMOD(replication, "[Replica={}] Save logical snapshot callback return obj_id={}", + g_helper->replica_num(), snp_data->offset); return; } @@ -260,7 +258,6 @@ class TestReplicatedDB : public homestore::ReplDevListener { value.blkid_ = out_blkids; } inmem_db_.insert_or_assign(key, value); - last_committed_lsn = value.lsn_; ++commit_count_; ptr++; } @@ -392,7 +389,6 @@ class TestReplicatedDB : public homestore::ReplDevListener { std::map< int64_t, Value > lsn_index_; uint64_t commit_count_{0}; std::shared_mutex db_mtx_; - uint64_t last_committed_lsn{0}; std::shared_ptr< snapshot_context > m_last_snapshot{nullptr}; std::mutex m_snapshot_lock; bool zombie_{false}; @@ -421,11 +417,23 @@ class RaftReplDevTest : public testing::Test { for (auto const& db : dbs_) { if (db->is_zombie()) { continue; } auto repl_dev = std::dynamic_pointer_cast< RaftReplDev >(db->repl_dev()); + int i = 0; + bool force_leave = false; do { std::this_thread::sleep_for(std::chrono::seconds(1)); auto& raft_repl_svc = dynamic_cast< RaftReplService& >(hs()->repl_service()); raft_repl_svc.gc_repl_devs(); LOGINFO("Waiting for repl dev to get destroyed"); + + // TODO: if leader is destroyed, but the follower does not receive the notification, it will not be + // destroyed for ever. we need handle this in raft_repl_dev. revisit here after making changes at + // raft_repl_dev side to hanle this case. this is a workaround to avoid the infinite loop for now. + if (i++ > 10 && !force_leave) { + LOGWARN("Waiting for repl dev to get destroyed and it is leader, so do a force leave"); + repl_dev->force_leave(); + force_leave = true; + } + } while (!repl_dev->is_destroyed()); } } @@ -919,16 +927,9 @@ TEST_F(RaftReplDevTest, BaselineTest) { LOGINFO("Homestore replica={} setup completed", g_helper->replica_num()); g_helper->sync_for_test_start(); -#ifdef _PRERELEASE - // If debug build we set flip to force truncate. - if (g_helper->replica_num() == 0) { - LOGINFO("Set force home logstore truncate"); - g_helper->set_basic_flip("force_home_raft_log_truncate"); - } -#endif - // Write some entries on leader. uint64_t entries_per_attempt = 50; + LOGINFO("Write on leader num_entries={}", entries_per_attempt); this->write_on_leader(entries_per_attempt, true /* wait_for_commit */);