diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index b42d5381c..87525a244 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -1286,11 +1286,11 @@ nuraft::cb_func::ReturnCode RaftReplDev::raft_event(nuraft::cb_func::Type type, if (ret == nuraft::cb_func::ReturnCode::Ok) { m_state_machine->inc_next_batch_size_hint(); } return ret; } + case nuraft::cb_func::Type::JoinedCluster: case nuraft::cb_func::Type::BecomeFollower: { become_follower_cb(); return nuraft::cb_func::ReturnCode::Ok; } - case nuraft::cb_func::Type::JoinedCluster: case nuraft::cb_func::Type::BecomeLeader: { become_leader_cb(); return nuraft::cb_func::ReturnCode::Ok; diff --git a/src/lib/replication/repl_dev/raft_repl_dev.h b/src/lib/replication/repl_dev/raft_repl_dev.h index ad2e869e4..e1c9db917 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.h +++ b/src/lib/replication/repl_dev/raft_repl_dev.h @@ -217,6 +217,20 @@ class RaftReplDev : public ReplDev, cshared< ReplDevCPContext > get_cp_ctx(CP* cp); void cp_cleanup(CP* cp); void become_ready(); + void become_leader_cb() { + auto new_gate = raft_server()->get_last_log_idx(); + repl_lsn_t existing_gate = 0; + if (!m_traffic_ready_lsn.compare_exchange_strong(existing_gate, new_gate)) { + // was a follower, m_traffic_ready_lsn should be zero on follower. + RD_REL_ASSERT(existing_gate == 0, "existing gate should be zero"); + } + RD_LOGD("become_leader_cb: setting traffic_ready_lsn from {} to {}", existing_gate, new_gate); + }; + void become_follower_cb() { + // m_traffic_ready_lsn should be zero on follower. + m_traffic_ready_lsn.store(0); + RD_LOGD("become_follower_cb setting traffic_ready_lsn to 0"); + } /// @brief This method is called when the data journal is compacted /// @@ -275,20 +289,6 @@ class RaftReplDev : public ReplDev, nuraft::ptr< nuraft::log_store > load_log_store() override; int32_t server_id() override; void system_exit(const int exit_code) override { LOGINFO("System exiting with code [{}]", exit_code); } - void become_leader_cb() { - auto new_gate = get_last_commit_lsn(); - repl_lsn_t existing_gate = 0; - if (!m_traffic_ready_lsn.compare_exchange_strong(existing_gate, new_gate)) { - // was a follower, m_traffic_ready_lsn should be zero on follower. - RD_REL_ASSERT(existing_gate == 0, "existing gate should be zero"); - } - RD_LOGD("become_leader_cb: setting traffic_ready_lsn from {} to {}", existing_gate, new_gate); - }; - void become_follower_cb() { - // m_traffic_ready_lsn should be zero on follower. - m_traffic_ready_lsn.store(0); - RD_LOGD("become_follower_cb setting traffic_ready_lsn to 0"); - } //////////////// All nuraft_mesg::mesg_state_mgr overrides /////////////////////// uint32_t get_logstore_id() const override;