diff --git a/conanfile.py b/conanfile.py index f241149e6..f557e1bba 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "6.5.27" + version = "6.5.28" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" @@ -54,7 +54,7 @@ def build_requirements(self): def requirements(self): self.requires("iomgr/[^11.3]@oss/master", transitive_headers=True) self.requires("sisl/[^12.2]@oss/master", transitive_headers=True) - self.requires("nuraft_mesg/[^3.7]@oss/main", transitive_headers=True) + self.requires("nuraft_mesg/[^3.7.1]@oss/main", transitive_headers=True) self.requires("farmhash/cci.20190513@", transitive_headers=True) if self.settings.arch in ['x86', 'x86_64']: diff --git a/src/include/homestore/replication/repl_dev.h b/src/include/homestore/replication/repl_dev.h index 335cda834..ec8344be0 100644 --- a/src/include/homestore/replication/repl_dev.h +++ b/src/include/homestore/replication/repl_dev.h @@ -124,8 +124,9 @@ struct snapshot_obj { bool is_last_obj{false}; }; -//HomeStore has some meta information to be transmitted during the baseline resync, -//Although now only dsn needs to be synced, this structure is defined as a general message, and we can easily add data if needed in the future. +// HomeStore has some meta information to be transmitted during the baseline resync, +// Although now only dsn needs to be synced, this structure is defined as a general message, and we can easily add data +// if needed in the future. struct snp_repl_dev_data { uint64_t magic_num{HOMESTORE_RESYNC_DATA_MAGIC}; uint32_t protocol_version{HOMESTORE_RESYNC_DATA_PROTOCOL_VERSION_V1}; @@ -468,6 +469,10 @@ class ReplDev { /// @return last_commit_lsn virtual repl_lsn_t get_last_commit_lsn() const = 0; + /// @brief if this replica is ready for accepting client IO. + /// @return true if ready, false otherwise + virtual bool is_ready_for_traffic() const = 0; + virtual void attach_listener(shared< ReplDevListener > listener) { m_listener = std::move(listener); } virtual void detach_listener() { diff --git a/src/lib/replication/repl_dev/common.cpp b/src/lib/replication/repl_dev/common.cpp index 1c2a8c560..e5b34dbcd 100644 --- a/src/lib/replication/repl_dev/common.cpp +++ b/src/lib/replication/repl_dev/common.cpp @@ -174,8 +174,7 @@ void repl_req_ctx::release_data() { m_buf_for_unaligned_data = sisl::io_blob_safe{}; if (m_pushed_data) { LOGTRACEMOD(replication, "m_pushed_data addr={}, m_rkey={}, m_lsn={}", - static_cast(m_pushed_data.get()), - m_rkey.to_string(), m_lsn); + static_cast< void* >(m_pushed_data.get()), m_rkey.to_string(), m_lsn); m_pushed_data->send_response(); m_pushed_data = nullptr; } diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index d92464c8b..2449f7833 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -1237,8 +1237,7 @@ void RaftReplDev::leave() { m_destroy_promise.setValue(ReplServiceError::OK); // In case proposer is waiting for the destroy to complete } -std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nuraft::cb_func::Type type, - nuraft::cb_func::Param* param) { +nuraft::cb_func::ReturnCode RaftReplDev::raft_event(nuraft::cb_func::Type type, nuraft::cb_func::Param* param) { auto ret = nuraft::cb_func::ReturnCode::Ok; switch (type) { @@ -1283,7 +1282,7 @@ std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nu // If there is nothing we can accept(i==0), that maens we are waiting for commit // of previous lsn, set it to 1 in this case. m_state_machine->reset_next_batch_size_hint(std::max(1ul, i)); - return {true, nuraft::cb_func::ReturnCode::ReturnNull}; + return nuraft::cb_func::ReturnCode::ReturnNull; } reqs->emplace_back(std::move(req)); } @@ -1298,7 +1297,21 @@ std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nu sisl::VectorPool< repl_req_ptr_t >::free(reqs); } if (ret == nuraft::cb_func::ReturnCode::Ok) { m_state_machine->inc_next_batch_size_hint(); } - return {true, ret}; + return ret; + } + case nuraft::cb_func::Type::JoinedCluster: + RD_LOGD("Raft channel: Received JoinedCluster, implies become_follower"); + become_follower_cb(); + return nuraft::cb_func::ReturnCode::Ok; + case nuraft::cb_func::Type::BecomeFollower: { + RD_LOGD("Raft channel: Received BecomeFollower"); + become_follower_cb(); + return nuraft::cb_func::ReturnCode::Ok; + } + case nuraft::cb_func::Type::BecomeLeader: { + RD_LOGD("Raft channel: Received BecomeLeader"); + become_leader_cb(); + return nuraft::cb_func::ReturnCode::Ok; } // RemovedFromCluster will be handled in nuraft_mesg::generic_raft_event_handler where leave() is called @@ -1307,7 +1320,7 @@ std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nu default: break; } - return {false, ret}; + return nuraft::cb_func::ReturnCode::Ok; } void RaftReplDev::flush_durable_commit_lsn() { diff --git a/src/lib/replication/repl_dev/raft_repl_dev.h b/src/lib/replication/repl_dev/raft_repl_dev.h index 9e29a5737..e9ec2a1ad 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.h +++ b/src/lib/replication/repl_dev/raft_repl_dev.h @@ -137,6 +137,10 @@ class RaftReplDev : public ReplDev, std::atomic< repl_lsn_t > m_commit_upto_lsn{0}; // LSN which was lastly written, to track flushes std::atomic< repl_lsn_t > m_compact_lsn{0}; // LSN upto which it was compacted, it is used to track where to + // The `traffic_ready_lsn` variable holds the Log Sequence Number (LSN) up to which + // the state machine should committed to before accepting traffic. This threshold ensures that + // all potential committed log be committed before handling incoming requests. + std::atomic< repl_lsn_t > m_traffic_ready_lsn{0}; std::mutex m_sb_mtx; // Lock to protect the repl dev superblock @@ -187,6 +191,13 @@ class RaftReplDev : public ReplDev, bool is_destroy_pending() const; bool is_destroyed() const; Clock::time_point destroyed_time() const { return m_destroyed_time; } + bool is_ready_for_traffic() const { + auto committed_lsn = m_commit_upto_lsn.load(); + auto gate = m_traffic_ready_lsn.load(); + bool ready = committed_lsn >= gate; + if (!ready) { RD_LOGD("Not yet ready for traffic, committed to {} but gate is {}", committed_lsn, gate); } + return ready; + } //////////////// Accessor/shortcut methods /////////////////////// nuraft_mesg::repl_service_ctx* group_msg_service(); @@ -206,6 +217,20 @@ class RaftReplDev : public ReplDev, cshared< ReplDevCPContext > get_cp_ctx(CP* cp); void cp_cleanup(CP* cp); void become_ready(); + void become_leader_cb() { + auto new_gate = raft_server()->get_last_log_idx(); + repl_lsn_t existing_gate = 0; + if (!m_traffic_ready_lsn.compare_exchange_strong(existing_gate, new_gate)) { + // was a follower, m_traffic_ready_lsn should be zero on follower. + RD_REL_ASSERT(existing_gate == 0, "existing gate should be zero"); + } + RD_LOGD("become_leader_cb: setting traffic_ready_lsn from {} to {}", existing_gate, new_gate); + }; + void become_follower_cb() { + // m_traffic_ready_lsn should be zero on follower. + m_traffic_ready_lsn.store(0); + RD_LOGD("become_follower_cb setting traffic_ready_lsn to 0"); + } /// @brief This method is called when the data journal is compacted /// @@ -270,8 +295,8 @@ class RaftReplDev : public ReplDev, std::shared_ptr< nuraft::state_machine > get_state_machine() override; void permanent_destroy() override; void leave() override; - std::pair< bool, nuraft::cb_func::ReturnCode > handle_raft_event(nuraft::cb_func::Type, - nuraft::cb_func::Param*) override; + + nuraft::cb_func::ReturnCode raft_event(nuraft::cb_func::Type, nuraft::cb_func::Param*) override; private: shared< nuraft::log_store > data_journal() { return m_data_journal; } diff --git a/src/lib/replication/repl_dev/raft_state_machine.cpp b/src/lib/replication/repl_dev/raft_state_machine.cpp index 8909614a0..09bd6b7ba 100644 --- a/src/lib/replication/repl_dev/raft_state_machine.cpp +++ b/src/lib/replication/repl_dev/raft_state_machine.cpp @@ -385,7 +385,7 @@ bool RaftStateMachine::apply_snapshot(nuraft::snapshot& s) { m_rd.m_data_journal->set_last_durable_lsn(s.get_last_log_idx()); auto snp_ctx = std::make_shared< nuraft_snapshot_context >(s); auto res = m_rd.m_listener->apply_snapshot(snp_ctx); - //make sure the changes are flushed. + // make sure the changes are flushed. hs()->cp_mgr().trigger_cp_flush(true /* force */).get(); return res; } diff --git a/src/lib/replication/repl_dev/solo_repl_dev.h b/src/lib/replication/repl_dev/solo_repl_dev.h index 911f4bd28..e5f33fb63 100644 --- a/src/lib/replication/repl_dev/solo_repl_dev.h +++ b/src/lib/replication/repl_dev/solo_repl_dev.h @@ -53,6 +53,7 @@ class SoloReplDev : public ReplDev { std::vector< peer_info > get_replication_status() const override { return std::vector< peer_info >{peer_info{.id_ = m_group_id, .replication_idx_ = 0, .last_succ_resp_us_ = 0}}; } + bool is_ready_for_traffic() const override { return true; } uuid_t group_id() const override { return m_group_id; } diff --git a/src/tests/test_common/raft_repl_test_base.hpp b/src/tests/test_common/raft_repl_test_base.hpp index 21d5fa3f2..19a346f5a 100644 --- a/src/tests/test_common/raft_repl_test_base.hpp +++ b/src/tests/test_common/raft_repl_test_base.hpp @@ -350,6 +350,10 @@ class TestReplicatedDB : public homestore::ReplDevListener { void validate_db_data() { g_helper->runner().set_num_tasks(inmem_db_.size()); + while (!repl_dev()->is_ready_for_traffic()) { + LOGINFO("not yet ready for traffic, waiting"); + std::this_thread::sleep_for(std::chrono::milliseconds{500}); + } LOGINFOMOD(replication, "[{}]: Total {} keys committed, validating them", boost::uuids::to_string(repl_dev()->group_id()), inmem_db_.size()); @@ -554,7 +558,8 @@ class RaftReplDevTestBase : public testing::Test { if (dbs_[0]->repl_dev() == nullptr) return; do { - auto leader_uuid = dbs_[0]->repl_dev()->get_leader_id(); + auto repl_dev = dbs_[0]->repl_dev(); + auto leader_uuid = repl_dev->get_leader_id(); if (leader_uuid.is_nil()) { LOGINFO("Waiting for leader to be elected"); @@ -562,6 +567,10 @@ class RaftReplDevTestBase : public testing::Test { } else if (leader_uuid == g_helper->my_replica_id()) { LOGINFO("Writing {} entries since I am the leader my_uuid={}", num_entries, boost::uuids::to_string(g_helper->my_replica_id())); + if (!repl_dev->is_ready_for_traffic()) { + LOGINFO("leader is not yet ready for traffic, waiting"); + std::this_thread::sleep_for(std::chrono::milliseconds{500}); + } auto const block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); g_helper->runner().set_num_tasks(num_entries);