diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 187746cc4..2150cd139 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -267,7 +267,16 @@ where // Setup sentinel values to track when we've received majority confirmation of leadership. let resp = { - let read_log_id = self.engine.state.get_read_log_id().copied(); + let l = self.engine.leader_handler(); + let lh = match l { + Ok(leading_handler) => leading_handler, + Err(forward) => { + let _ = tx.send(Err(forward.into())); + return; + } + }; + + let read_log_id = lh.get_read_log_id(); // TODO: this applied is a little stale when being returned to client. // Fix this when the following heartbeats are replaced with calling RaftNetwork. @@ -1133,11 +1142,7 @@ where self.engine.handle_install_full_snapshot(vote, snapshot, tx); } RaftMsg::CheckIsLeaderRequest { tx } => { - if self.engine.state.is_leader(&self.engine.config.id) { - self.handle_check_is_leader_request(tx).await; - } else { - self.reject_with_forward_to_leader(tx); - } + self.handle_check_is_leader_request(tx).await; } RaftMsg::ClientWriteRequest { app_data, tx } => { self.write_entry(C::Entry::from_app_data(app_data), Some(tx)); diff --git a/openraft/src/engine/handler/leader_handler/get_read_log_id_test.rs b/openraft/src/engine/handler/leader_handler/get_read_log_id_test.rs new file mode 100644 index 000000000..fd11d8e11 --- /dev/null +++ b/openraft/src/engine/handler/leader_handler/get_read_log_id_test.rs @@ -0,0 +1,60 @@ +use std::sync::Arc; + +use maplit::btreeset; +#[allow(unused_imports)] use pretty_assertions::assert_eq; +#[allow(unused_imports)] use pretty_assertions::assert_ne; +#[allow(unused_imports)] use pretty_assertions::assert_str_eq; + +use crate::engine::testing::UTConfig; +use crate::engine::Engine; +use crate::testing::log_id; +use crate::utime::UTime; +use crate::EffectiveMembership; +use crate::Membership; +use crate::MembershipState; +use crate::TokioInstant; +use crate::Vote; + +fn m01() -> Membership { + Membership::::new(vec![btreeset! {0,1}], None) +} + +fn m23() -> Membership { + Membership::::new(vec![btreeset! {2,3}], btreeset! {1,2,3}) +} + +fn eng() -> Engine { + let mut eng = Engine::testing_default(0); + eng.state.enable_validation(false); // Disable validation for incomplete state + + eng.config.id = 1; + eng.state.committed = Some(log_id(0, 1, 0)); + eng.state.vote = UTime::new(TokioInstant::now(), Vote::new_committed(3, 1)); + eng.state.log_ids.append(log_id(1, 1, 1)); + eng.state.log_ids.append(log_id(2, 1, 3)); + eng.state.membership_state = MembershipState::new( + Arc::new(EffectiveMembership::new(Some(log_id(1, 1, 1)), m01())), + Arc::new(EffectiveMembership::new(Some(log_id(2, 1, 3)), m23())), + ); + eng.state.server_state = eng.calc_server_state(); + + eng +} + +#[test] +fn test_get_read_log_id() -> anyhow::Result<()> { + let mut eng = eng(); + eng.vote_handler().become_leading(); + + eng.state.committed = Some(log_id(0, 1, 0)); + eng.internal_server_state.leading_mut().unwrap().noop_log_id = Some(log_id(1, 1, 2)); + + let got = eng.leader_handler()?.get_read_log_id(); + assert_eq!(Some(log_id(1, 1, 2)), got); + + eng.state.committed = Some(log_id(2, 1, 3)); + let got = eng.leader_handler()?.get_read_log_id(); + assert_eq!(Some(log_id(2, 1, 3)), got); + + Ok(()) +} diff --git a/openraft/src/engine/handler/leader_handler/mod.rs b/openraft/src/engine/handler/leader_handler/mod.rs index dc30f5f3d..2085140e4 100644 --- a/openraft/src/engine/handler/leader_handler/mod.rs +++ b/openraft/src/engine/handler/leader_handler/mod.rs @@ -1,4 +1,3 @@ -#[allow(unused_imports)] use crate::docs; use crate::engine::handler::replication_handler::ReplicationHandler; use crate::engine::handler::replication_handler::SendNone; use crate::engine::Command; @@ -7,11 +6,14 @@ use crate::engine::EngineOutput; use crate::entry::RaftPayload; use crate::internal_server_state::LeaderQuorumSet; use crate::leader::Leading; +use crate::raft_state::LogStateReader; +use crate::type_config::alias::LogIdOf; use crate::RaftLogId; use crate::RaftState; use crate::RaftTypeConfig; #[cfg(test)] mod append_entries_test; +#[cfg(test)] mod get_read_log_id_test; #[cfg(test)] mod send_heartbeat_test; /// Handle leader operations. @@ -81,6 +83,15 @@ where C: RaftTypeConfig rh.initiate_replication(SendNone::True); } + /// Get the log id for a linearizable read. + /// + /// See: [Read Operation](crate::docs::protocol::read) + pub(crate) fn get_read_log_id(&self) -> Option> { + let committed = self.state.committed().copied(); + // noop log id is the first log this leader proposed. + std::cmp::max(self.leader.noop_log_id, committed) + } + pub(crate) fn replication_handler(&mut self) -> ReplicationHandler { ReplicationHandler { config: self.config, diff --git a/openraft/src/engine/log_id_list.rs b/openraft/src/engine/log_id_list.rs index 68eac2d08..b952d2d7c 100644 --- a/openraft/src/engine/log_id_list.rs +++ b/openraft/src/engine/log_id_list.rs @@ -300,6 +300,7 @@ impl LogIdList { /// /// Note that the 0-th log does not belong to any leader(but a membership log to initialize a /// cluster) but this method does not differentiate between them. + #[allow(dead_code)] pub(crate) fn by_last_leader(&self) -> &[LogId] { let ks = &self.key_log_ids; let l = ks.len(); diff --git a/openraft/src/raft_state/mod.rs b/openraft/src/raft_state/mod.rs index bf97480d3..d390eef9c 100644 --- a/openraft/src/raft_state/mod.rs +++ b/openraft/src/raft_state/mod.rs @@ -30,7 +30,6 @@ mod tests { mod accepted_test; mod forward_to_leader_test; mod log_state_reader_test; - mod read_log_id_test; mod validate_test; } @@ -39,7 +38,6 @@ pub(crate) use log_state_reader::LogStateReader; pub use membership_state::MembershipState; pub(crate) use vote_state_reader::VoteStateReader; -use crate::display_ext::DisplayOptionExt; pub(crate) use crate::raft_state::snapshot_streaming::StreamingState; use crate::type_config::alias::InstantOf; use crate::type_config::alias::LogIdOf; @@ -203,29 +201,6 @@ where C: RaftTypeConfig self.vote.utime() } - /// Get the log id for a linearizable read. - /// - /// See: [Read Operation](crate::docs::protocol::read) - pub(crate) fn get_read_log_id(&self) -> Option<&LogId> { - // Get the first known log id appended by the last leader. - // - This log may not be committed. - // - The leader blank log may have been purged and this could be the last purged log id. - // - There must be such an entry, which is guaranteed by `Engine::establish_leader()`. - let leader_first = self.log_ids.by_last_leader().first(); - - debug_assert_eq!( - leader_first.map(|log_id| *log_id.committed_leader_id()), - self.vote_ref().committed_leader_id(), - "leader_first must belong to a leader of current vote: leader_first: {}, vote.committed_leader_id: {}", - leader_first.map(|log_id| log_id.committed_leader_id()).display(), - self.vote_ref().committed_leader_id().display(), - ); - - let committed = self.committed(); - - std::cmp::max(leader_first, committed) - } - /// Return the accepted last log id of the current leader. pub(crate) fn accepted(&self) -> Option<&LogId> { self.accepted.last_accepted_log_id(self.vote_ref().leader_id()) diff --git a/openraft/src/raft_state/tests/read_log_id_test.rs b/openraft/src/raft_state/tests/read_log_id_test.rs deleted file mode 100644 index e1a399bb6..000000000 --- a/openraft/src/raft_state/tests/read_log_id_test.rs +++ /dev/null @@ -1,40 +0,0 @@ -use crate::engine::testing::UTConfig; -use crate::engine::LogIdList; -use crate::utime::UTime; -use crate::CommittedLeaderId; -use crate::LogId; -use crate::RaftState; -use crate::Vote; - -fn log_id(term: u64, index: u64) -> LogId { - LogId:: { - leader_id: CommittedLeaderId::new(term, 0), - index, - } -} - -#[test] -fn test_raft_state_get_read_log_id() -> anyhow::Result<()> { - let log_ids = || LogIdList::new(vec![log_id(1, 1), log_id(3, 4), log_id(3, 6)]); - { - let rs = RaftState:: { - vote: UTime::without_utime(Vote::new_committed(3, 0)), - log_ids: log_ids(), - committed: Some(log_id(2, 1)), - ..Default::default() - }; - - assert_eq!(Some(log_id(3, 4)), rs.get_read_log_id().copied()); - } - - { - let rs = RaftState:: { - vote: UTime::without_utime(Vote::new_committed(3, 0)), - log_ids: log_ids(), - committed: Some(log_id(3, 5)), - ..Default::default() - }; - assert_eq!(Some(log_id(3, 5)), rs.get_read_log_id().copied()); - } - Ok(()) -}