diff --git a/examples/raft-kv-memstore/src/network/api.rs b/examples/raft-kv-memstore/src/network/api.rs index 90f7e6888..ebb55deba 100644 --- a/examples/raft-kv-memstore/src/network/api.rs +++ b/examples/raft-kv-memstore/src/network/api.rs @@ -39,7 +39,7 @@ pub async fn read(app: Data, req: Json) -> actix_web::Result, req: Json) -> actix_web::Result { - let ret = app.raft.is_leader().await; + let ret = app.raft.ensure_linearizable().await; match ret { Ok(_) => { diff --git a/examples/raft-kv-rocksdb/src/network/api.rs b/examples/raft-kv-rocksdb/src/network/api.rs index 353df4f7b..10bd5e6ab 100644 --- a/examples/raft-kv-rocksdb/src/network/api.rs +++ b/examples/raft-kv-rocksdb/src/network/api.rs @@ -43,7 +43,7 @@ async fn read(mut req: Request>) -> tide::Result { } async fn consistent_read(mut req: Request>) -> tide::Result { - let ret = req.state().raft.is_leader().await; + let ret = req.state().raft.ensure_linearizable().await; match ret { Ok(_) => { diff --git a/memstore/src/lib.rs b/memstore/src/lib.rs index cc6f10740..180f81173 100644 --- a/memstore/src/lib.rs +++ b/memstore/src/lib.rs @@ -74,8 +74,14 @@ pub type MemNodeId = u64; openraft::declare_raft_types!( /// Declare the type configuration for `MemStore`. - pub TypeConfig: D = ClientRequest, R = ClientResponse, NodeId = MemNodeId, Node = (), - Entry = Entry, SnapshotData = Cursor>, AsyncRuntime = TokioRuntime + pub TypeConfig: + D = ClientRequest, + R = ClientResponse, + NodeId = MemNodeId, + Node = (), + Entry = Entry, + SnapshotData = Cursor>, + AsyncRuntime = TokioRuntime ); /// The application snapshot type which the `MemStore` works with. diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 547ee30fd..6bb0f698f 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -29,6 +29,7 @@ use crate::core::command_state::CommandState; use crate::core::notify::Notify; use crate::core::raft_msg::external_command::ExternalCommand; use crate::core::raft_msg::AppendEntriesTx; +use crate::core::raft_msg::ClientReadTx; use crate::core::raft_msg::ClientWriteTx; use crate::core::raft_msg::InstallSnapshotTx; use crate::core::raft_msg::RaftMsg; @@ -45,7 +46,6 @@ use crate::engine::Engine; use crate::engine::Respond; use crate::entry::FromAppData; use crate::entry::RaftEntry; -use crate::error::CheckIsLeaderError; use crate::error::ClientWriteError; use crate::error::Fatal; use crate::error::ForwardToLeader; @@ -263,12 +263,19 @@ where // TODO: the second condition is such a read request can only read from state machine only when the last log it sees // at `T1` is committed. #[tracing::instrument(level = "trace", skip(self, tx))] - pub(super) async fn handle_check_is_leader_request( - &mut self, - tx: ResultSender<(), CheckIsLeaderError>, - ) { + pub(super) async fn handle_check_is_leader_request(&mut self, tx: ClientReadTx) { // Setup sentinel values to track when we've received majority confirmation of leadership. + let resp = { + let read_log_id = self.engine.state.get_read_log_id().copied(); + + // TODO: this applied is a little stale when being returned to client. + // Fix this when the following heartbeats are replaced with calling RaftNetwork. + let applied = self.engine.state.io_applied().copied(); + + (read_log_id, applied) + }; + let my_id = self.id; let my_vote = *self.engine.state.vote_ref(); let ttl = Duration::from_millis(self.config.heartbeat_interval); @@ -278,7 +285,7 @@ where let mut granted = btreeset! {my_id}; if eff_mem.is_quorum(granted.iter()) { - let _ = tx.send(Ok(())); + let _ = tx.send(Ok(resp)); return; } @@ -351,7 +358,7 @@ where } }; - // If we receive a response with a greater term, then revert to follower and abort this + // If we receive a response with a greater vote, then revert to follower and abort this // request. if let AppendEntriesResponse::HigherVote(vote) = append_res { debug_assert!( @@ -368,7 +375,7 @@ where }); if let Err(_e) = send_res { - tracing::error!("fail to send HigherVote to raft core"); + tracing::error!("fail to send HigherVote to RaftCore"); } // we are no longer leader so error out early @@ -380,7 +387,7 @@ where granted.insert(target); if eff_mem.is_quorum(granted.iter()) { - let _ = tx.send(Ok(())); + let _ = tx.send(Ok(resp)); return; } } @@ -395,6 +402,7 @@ where .into())); }; + // TODO: do not spawn, manage read requests with a queue by RaftCore C::AsyncRuntime::spawn(waiting_fu.instrument(tracing::debug_span!("spawn_is_leader_waiting"))); } diff --git a/openraft/src/core/raft_msg/mod.rs b/openraft/src/core/raft_msg/mod.rs index e7b1c3c69..7c0892271 100644 --- a/openraft/src/core/raft_msg/mod.rs +++ b/openraft/src/core/raft_msg/mod.rs @@ -16,6 +16,9 @@ use crate::raft::InstallSnapshotRequest; use crate::raft::InstallSnapshotResponse; use crate::raft::VoteRequest; use crate::raft::VoteResponse; +use crate::type_config::alias::LogIdOf; +use crate::type_config::alias::NodeIdOf; +use crate::type_config::alias::NodeOf; use crate::ChangeMembers; use crate::MessageSummary; use crate::RaftTypeConfig; @@ -35,8 +38,11 @@ pub(crate) type VoteTx = ResultSender, Infallible>; pub(crate) type AppendEntriesTx = ResultSender, Infallible>; /// TX for Client Write Response -pub(crate) type ClientWriteTx = - ResultSender, ClientWriteError<::NodeId, ::Node>>; +pub(crate) type ClientWriteTx = ResultSender, ClientWriteError, NodeOf>>; + +/// TX for Linearizable Read Response +pub(crate) type ClientReadTx = + ResultSender<(Option>, Option>), CheckIsLeaderError, NodeOf>>; /// A message sent by application to the [`RaftCore`]. /// @@ -65,7 +71,7 @@ where C: RaftTypeConfig }, CheckIsLeaderRequest { - tx: ResultSender<(), CheckIsLeaderError>, + tx: ClientReadTx, }, Initialize { diff --git a/openraft/src/docs/protocol/mod.rs b/openraft/src/docs/protocol/mod.rs index fd20e0319..9b88cb02e 100644 --- a/openraft/src/docs/protocol/mod.rs +++ b/openraft/src/docs/protocol/mod.rs @@ -1,5 +1,9 @@ //! The protocol used by Openraft to replicate data. +pub mod read { + #![doc = include_str!("read.md")] +} + pub mod replication { #![doc = include_str!("replication.md")] diff --git a/openraft/src/docs/protocol/read.md b/openraft/src/docs/protocol/read.md new file mode 100644 index 000000000..82ac4b42c --- /dev/null +++ b/openraft/src/docs/protocol/read.md @@ -0,0 +1,74 @@ +# Read Operations + +Read operations within a Raft cluster are guaranteed to be linearizable. + +This ensures that for any two read operations, +`A` and `B`, if `B` occurs after `A` by wall clock time, +then `B` will observe the same state as `A` or any subsequent state changes made by `A`, +regardless of which node within the cluster each operation is performed on. + +In Openraft `read_index` is the same as the `read_index` in the original raft paper. +Openraft also use `read_log_id` instead of `read_index`. + +## Ensuring linearizability + +To ensure linearizability, read operations must perform a [`get_read_log_id()`] operation on the leader before proceeding. + +This method confirms that this node is the leader at the time of invocation by sending heartbeats to a quorum of followers, and returns `(read_log_id, last_applied_log_id)`: +- `read_log_id` represents the log id up to which the state machine should apply to ensure a + linearizable read, +- `last_applied_log_id` is the last applied log id. + +The caller then wait for `last_applied_log_id` to catch up `read_log_id`, which can be done by subscribing to [`Raft::metrics`], +and at last, proceed with the state machine read. + +The above steps are encapsulated in the [`ensure_linearizable()`] method. + +## Examples + +```ignore +my_raft.ensure_linearizable().await?; +proceed_with_state_machine_read(); +``` + +The above snippet does the same as the following: + +```ignore +let (read_log_id, applied) = self.get_read_log_id().await?; + +if read_log_id.index() > applied.index() { + self.wait(None).applied_index_at_least(read_log_id.index(), "").await? +} + +proceed_with_state_machine_read(); +``` + +The comparison `read_log_id > applied_log_id` would also be valid in the above example. + + +## Ensuring Linearizability with `read_log_id` + +The `read_log_id` is determined as the maximum of the `last_committed_log_id` and the +log id of the first log entry in the current leader's term (the "blank" log entry). + +Assumes another earlier read operation reads from state machine with up to log id `A`. +Since the leader has all committed entries up to its initial blank log entry, +we have: `read_log_id >= A`. + +When the `last_applied_log_id` meets or exceeds `read_log_id`, +the state machine contains all state upto `A`. Therefore, a linearizable read is assured +when `last_applied_log_id >= read_log_id`. + + +## Ensuring Linearizability with `read_index` + +And it is also legal by comparing `last_applied_log_id.index() >= read_log_id.index()` +due to the guarantee that committed logs will not be lost. + +Since a previous read could only have observed committed logs, and `read_log_id.index()` is +at least as large as any committed log, once `last_applied_log_id.index() >= read_log_id.index()`, the state machine is assured to reflect all entries seen by any past read. + + +[`ensure_linearizable()`]: crate::Raft::ensure_linearizable +[`get_read_log_id()`]: crate::Raft::get_read_log_id +[`Raft::metrics`]: crate::Raft::metrics diff --git a/openraft/src/engine/log_id_list.rs b/openraft/src/engine/log_id_list.rs index 0065e8ed3..68eac2d08 100644 --- a/openraft/src/engine/log_id_list.rs +++ b/openraft/src/engine/log_id_list.rs @@ -269,7 +269,6 @@ impl LogIdList { /// Get the log id at the specified index. /// /// It will return `last_purged_log_id` if index is at the last purged index. - #[allow(dead_code)] pub(crate) fn get(&self, index: u64) -> Option> { let res = self.key_log_ids.binary_search_by(|log_id| log_id.index.cmp(&index)); @@ -285,12 +284,10 @@ impl LogIdList { } } - #[allow(dead_code)] pub(crate) fn first(&self) -> Option<&LogId> { self.key_log_ids.first() } - #[allow(dead_code)] pub(crate) fn last(&self) -> Option<&LogId> { self.key_log_ids.last() } @@ -298,4 +295,23 @@ impl LogIdList { pub(crate) fn key_log_ids(&self) -> &[LogId] { &self.key_log_ids } + + /// Returns key log ids appended by the last leader. + /// + /// Note that the 0-th log does not belong to any leader(but a membership log to initialize a + /// cluster) but this method does not differentiate between them. + pub(crate) fn by_last_leader(&self) -> &[LogId] { + let ks = &self.key_log_ids; + let l = ks.len(); + if l < 2 { + return ks; + } + + // There are at most two(adjacent) key log ids with the same leader_id + if ks[l - 1].leader_id() == ks[l - 2].leader_id() { + &ks[l - 2..] + } else { + &ks[l - 1..] + } + } } diff --git a/openraft/src/engine/tests/log_id_list_test.rs b/openraft/src/engine/tests/log_id_list_test.rs index 79b9ebb66..d67d26a3a 100644 --- a/openraft/src/engine/tests/log_id_list_test.rs +++ b/openraft/src/engine/tests/log_id_list_test.rs @@ -1,4 +1,5 @@ use crate::engine::LogIdList; +use crate::testing::log_id; #[test] fn test_log_id_list_extend_from_same_leader() -> anyhow::Result<()> { @@ -350,4 +351,28 @@ fn test_log_id_list_get_log_id() -> anyhow::Result<()> { Ok(()) } -use crate::testing::log_id; + +#[test] +fn test_log_id_list_by_last_leader() -> anyhow::Result<()> { + // len == 0 + let ids = LogIdList::::default(); + assert_eq!(ids.by_last_leader(), &[]); + + // len == 1 + let ids = LogIdList::::new([log_id(1, 1, 1)]); + assert_eq!(&[log_id(1, 1, 1)], ids.by_last_leader()); + + // len == 2, the last leader has only one log + let ids = LogIdList::::new([log_id(1, 1, 1), log_id(3, 1, 3)]); + assert_eq!(&[log_id(3, 1, 3)], ids.by_last_leader()); + + // len == 2, the last leader has two logs + let ids = LogIdList::::new([log_id(1, 1, 1), log_id(1, 1, 3)]); + assert_eq!(&[log_id(1, 1, 1), log_id(1, 1, 3)], ids.by_last_leader()); + + // len > 2, the last leader has only more than one logs + let ids = LogIdList::::new([log_id(1, 1, 1), log_id(7, 1, 8), log_id(7, 1, 10)]); + assert_eq!(&[log_id(7, 1, 8), log_id(7, 1, 10)], ids.by_last_leader()); + + Ok(()) +} diff --git a/openraft/src/raft/mod.rs b/openraft/src/raft/mod.rs index 01294f388..26b5e2bbc 100644 --- a/openraft/src/raft/mod.rs +++ b/openraft/src/raft/mod.rs @@ -51,6 +51,7 @@ use crate::error::RaftError; use crate::membership::IntoNodes; use crate::metrics::RaftMetrics; use crate::metrics::Wait; +use crate::metrics::WaitError; use crate::network::RaftNetworkFactory; use crate::raft::raft_inner::RaftInner; use crate::raft::runtime_config_handle::RuntimeConfigHandle; @@ -362,10 +363,96 @@ where C: RaftTypeConfig /// /// The actual read operation itself is up to the application, this method just ensures that /// the read will not be stale. + #[deprecated(note = "use `Raft::ensure_linearizable()` instead. deprecated since 0.9.0")] #[tracing::instrument(level = "debug", skip(self))] pub async fn is_leader(&self) -> Result<(), RaftError>> { let (tx, rx) = oneshot::channel(); - self.call_core(RaftMsg::CheckIsLeaderRequest { tx }, rx).await + let _ = self.call_core(RaftMsg::CheckIsLeaderRequest { tx }, rx).await?; + Ok(()) + } + + /// Ensures a read operation performed following this method are linearizable across the + /// cluster. + /// + /// This method is just a shorthand for calling [`get_read_log_id()`](Raft::get_read_log_id) and + /// then calling [Raft::wait]. + /// + /// This method confirms the node's leadership at the time of invocation by sending + /// heartbeats to a quorum of followers, and the state machine is up to date. + /// This method blocks until all these conditions are met. + /// + /// Returns: + /// - `Ok(read_log_id)` on successful confirmation that the node is the leader. `read_log_id` + /// represents the log id up to which the state machine has applied to ensure a linearizable + /// read. + /// - `Err(RaftError)` if it detects a higher term, or if it fails to + /// communicate with a quorum of followers. + /// + /// # Examples + /// ```ignore + /// my_raft.ensure_linearizable().await?; + /// // Proceed with the state machine read + /// ``` + /// Read more about how it works: [Read Operation](crate::docs::protocol::read) + #[tracing::instrument(level = "debug", skip(self))] + pub async fn ensure_linearizable( + &self, + ) -> Result>, RaftError>> { + let (read_log_id, applied) = self.get_read_log_id().await?; + + if read_log_id.index() > applied.index() { + self.wait(None) + .applied_index_at_least(read_log_id.index(), "ensure_linearizable") + .await + .map_err(|e| match e { + WaitError::Timeout(_, _) => { + unreachable!("did not specify timeout") + } + WaitError::ShuttingDown => Fatal::Stopped, + })?; + } + Ok(read_log_id) + } + + /// Ensures this node is leader and returns the log id up to which the state machine should + /// apply to ensure a read can be linearizable across the cluster. + /// + /// The leadership is ensured by sending heartbeats to a quorum of followers. + /// Note that this is just the first step for linearizable read. The second step is to wait for + /// state machine to reach the returned `read_log_id`. + /// + /// Returns: + /// - `Ok((read_log_id, last_applied_log_id))` on successful confirmation that the node is the + /// leader. `read_log_id` represents the log id up to which the state machine should apply to + /// ensure a linearizable read. + /// - `Err(RaftError)` if it detects a higher term, or if it fails to + /// communicate with a quorum of followers. + /// + /// The caller should then wait for `last_applied_log_id` to catch up, which can be done by + /// subscribing to [`Raft::metrics`] and waiting for `last_applied_log_id` to + /// reach `read_log_id`. + /// + /// # Examples + /// ```ignore + /// let (read_log_id, applied_log_id) = my_raft.get_read_log_id().await?; + /// if read_log_id.index() > applied_log_id.index() { + /// my_raft.wait(None).applied_index_at_least(read_log_id.index()).await?; + /// } + /// // Proceed with the state machine read + /// ``` + /// The comparison `read_log_id > applied_log_id` would also be valid in the above example. + /// + /// See: [Read Operation](crate::docs::protocol::read) + #[tracing::instrument(level = "debug", skip(self))] + pub async fn get_read_log_id( + &self, + ) -> Result< + (Option>, Option>), + RaftError>, + > { + let (tx, rx) = oneshot::channel(); + let (read_log_id, applied) = self.call_core(RaftMsg::CheckIsLeaderRequest { tx }, rx).await?; + Ok((read_log_id, applied)) } /// Submit a mutating client request to Raft to update the state of the system (ยง5.1). diff --git a/openraft/src/raft_state/mod.rs b/openraft/src/raft_state/mod.rs index 57d07a1ca..79e46b12c 100644 --- a/openraft/src/raft_state/mod.rs +++ b/openraft/src/raft_state/mod.rs @@ -32,6 +32,7 @@ mod tests { mod accepted_test; mod forward_to_leader_test; mod log_state_reader_test; + mod read_log_id_test; mod validate_test; } @@ -40,6 +41,7 @@ pub(crate) use log_state_reader::LogStateReader; pub use membership_state::MembershipState; pub(crate) use vote_state_reader::VoteStateReader; +use crate::display_ext::DisplayOptionExt; pub(crate) use crate::raft_state::snapshot_streaming::StreamingState; /// A struct used to represent the raft state which a Raft node needs. @@ -219,6 +221,29 @@ where self.vote.utime() } + /// Get the log id for a linearizable read. + /// + /// See: [Read Operation](crate::docs::protocol::read) + pub(crate) fn get_read_log_id(&self) -> Option<&LogId> { + // Get the first known log id appended by the last leader. + // - This log may not be committed. + // - The leader blank log may have been purged and this could be the last purged log id. + // - There must be such an entry, which is guaranteed by `Engine::establish_leader()`. + let leader_first = self.log_ids.by_last_leader().first(); + + debug_assert_eq!( + leader_first.map(|log_id| *log_id.committed_leader_id()), + self.vote_ref().committed_leader_id(), + "leader_first must belong to a leader of current vote: leader_first: {}, vote.committed_leader_id: {}", + leader_first.map(|log_id| log_id.committed_leader_id()).display(), + self.vote_ref().committed_leader_id().display(), + ); + + let committed = self.committed(); + + std::cmp::max(leader_first, committed) + } + /// Return the accepted last log id of the current leader. pub(crate) fn accepted(&self) -> Option<&LogId> { self.accepted.last_accepted_log_id(self.vote_ref().leader_id()) diff --git a/openraft/src/raft_state/tests/read_log_id_test.rs b/openraft/src/raft_state/tests/read_log_id_test.rs new file mode 100644 index 000000000..b14aa645a --- /dev/null +++ b/openraft/src/raft_state/tests/read_log_id_test.rs @@ -0,0 +1,40 @@ +use crate::engine::LogIdList; +use crate::utime::UTime; +use crate::CommittedLeaderId; +use crate::LogId; +use crate::RaftState; +use crate::TokioInstant; +use crate::Vote; + +fn log_id(term: u64, index: u64) -> LogId { + LogId:: { + leader_id: CommittedLeaderId::new(term, 0), + index, + } +} + +#[test] +fn test_raft_state_get_read_log_id() -> anyhow::Result<()> { + let log_ids = || LogIdList::new(vec![log_id(1, 1), log_id(3, 4), log_id(3, 6)]); + { + let rs = RaftState:: { + vote: UTime::without_utime(Vote::new_committed(3, 0)), + log_ids: log_ids(), + committed: Some(log_id(2, 1)), + ..Default::default() + }; + + assert_eq!(Some(log_id(3, 4)), rs.get_read_log_id().copied()); + } + + { + let rs = RaftState:: { + vote: UTime::without_utime(Vote::new_committed(3, 0)), + log_ids: log_ids(), + committed: Some(log_id(3, 5)), + ..Default::default() + }; + assert_eq!(Some(log_id(3, 5)), rs.get_read_log_id().copied()); + } + Ok(()) +} diff --git a/openraft/src/type_config.rs b/openraft/src/type_config.rs index b7152ea97..421f87539 100644 --- a/openraft/src/type_config.rs +++ b/openraft/src/type_config.rs @@ -86,4 +86,10 @@ pub(crate) mod alias { pub(crate) type InstantOf = as crate::AsyncRuntime>::Instant; pub(crate) type TimeoutErrorOf = as crate::AsyncRuntime>::TimeoutError; pub(crate) type TimeoutOf = as crate::AsyncRuntime>::Timeout; + + // Usually used types + pub(crate) type LogIdOf = crate::LogId>; + pub(crate) type VoteOf = crate::Vote>; + pub(crate) type LeaderIdOf = crate::LeaderId>; + pub(crate) type CommittedLeaderIdOf = crate::CommittedLeaderId>; } diff --git a/tests/tests/client_api/t11_client_reads.rs b/tests/tests/client_api/t11_client_reads.rs index 52a1a0716..7db692e2e 100644 --- a/tests/tests/client_api/t11_client_reads.rs +++ b/tests/tests/client_api/t11_client_reads.rs @@ -1,10 +1,17 @@ use std::sync::Arc; +use std::time::Duration; +use anyerror::AnyError; use anyhow::Result; use maplit::btreeset; +use openraft::error::NetworkError; +use openraft::error::RPCError; use openraft::Config; +use openraft::LogIdOptionExt; +use openraft::RPCTypes; use crate::fixtures::init_default_ut_tracing; +use crate::fixtures::RPCRequest; use crate::fixtures::RaftRouter; /// Client read tests. @@ -12,8 +19,8 @@ use crate::fixtures::RaftRouter; /// What does this test do? /// /// - create a stable 3-node cluster. -/// - call the is_leader interface on the leader, and assert success. -/// - call the is_leader interface on the followers, and assert failure. +/// - call the ensure_linearizable interface on the leader, and assert success. +/// - call the ensure_linearizable interface on the followers, and assert failure. #[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")] async fn client_reads() -> Result<()> { let config = Arc::new( @@ -25,35 +32,155 @@ async fn client_reads() -> Result<()> { ); let mut router = RaftRouter::new(config.clone()); - // This test is sensitive to network delay. + // This test is sensitive to network delay. Thus skip the network delay test router.network_send_delay(0); tracing::info!("--- initializing cluster"); let log_index = router.new_cluster(btreeset! {0,1,2}, btreeset! {}).await?; - // Get the ID of the leader, and assert that is_leader succeeds. + // Get the ID of the leader, and assert that ensure_linearizable succeeds. let leader = router.leader().expect("leader not found"); assert_eq!(leader, 0, "expected leader to be node 0, got {}", leader); router - .is_leader(leader) + .ensure_linearizable(leader) .await - .unwrap_or_else(|_| panic!("expected is_leader to succeed for cluster leader {}", leader)); + .unwrap_or_else(|_| panic!("ensure_linearizable to succeed for cluster leader {}", leader)); - router.is_leader(1).await.expect_err("expected is_leader on follower node 1 to fail"); - router.is_leader(2).await.expect_err("expected is_leader on follower node 2 to fail"); + router.ensure_linearizable(1).await.expect_err("ensure_linearizable on follower node 1 to fail"); + router.ensure_linearizable(2).await.expect_err("ensure_linearizable on follower node 2 to fail"); - tracing::info!(log_index, "--- isolate node 1 then is_leader should work"); + tracing::info!(log_index, "--- isolate node 1 then ensure_linearizable should work"); router.set_network_error(1, true); - router.is_leader(leader).await?; + router.ensure_linearizable(leader).await?; - tracing::info!(log_index, "--- isolate node 2 then is_leader should fail"); + tracing::info!(log_index, "--- isolate node 2 then ensure_linearizable should fail"); router.set_network_error(2, true); - let rst = router.is_leader(leader).await; - tracing::debug!(?rst, "is_leader with majority down"); + let rst = router.ensure_linearizable(leader).await; + tracing::debug!(?rst, "ensure_linearizable with majority down"); assert!(rst.is_err()); Ok(()) } + +/// - A leader that has not yet committed any log entries returns leader initialization log id(blank +/// log id). +/// - Return the last committed log id if the leader has committed any log entries. +#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")] +async fn get_read_log_id() -> Result<()> { + let config = Arc::new( + Config { + enable_heartbeat: false, + enable_elect: false, + heartbeat_interval: 100, + election_timeout_min: 101, + election_timeout_max: 102, + ..Default::default() + } + .validate()?, + ); + + let mut router = RaftRouter::new(config.clone()); + + tracing::info!("--- initializing cluster"); + let mut log_index = router.new_cluster(btreeset! {0,1}, btreeset! {}).await?; + + // Blocks append-entries to node 0, but let heartbeat pass. + let block_to_n0 = |_router: &_, req, _id, target| { + if target == 0 { + match req { + RPCRequest::AppendEntries(a) => { + // Heartbeat is not blocked. + if a.entries.is_empty() { + return Ok(()); + } + } + _ => { + unreachable!(); + } + } + + // Block append-entries to block commit. + let any_err = AnyError::error("block append-entries to node 0"); + Err(RPCError::Network(NetworkError::new(&any_err))) + } else { + Ok(()) + } + }; + + tracing::info!("--- block append-entries to node 0"); + router.set_rpc_pre_hook(RPCTypes::AppendEntries, block_to_n0); + + // Expire current leader + tokio::time::sleep(Duration::from_millis(200)).await; + + tracing::info!("--- let node 1 to become leader, append a blank log"); + let n1 = router.get_raft_handle(&1).unwrap(); + n1.trigger().elect().await?; + + tracing::info!(log_index = log_index, "--- node 1 appends blank log but can not commit"); + { + let res = n1.wait(timeout()).applied_index_at_least(Some(log_index + 1), "blank log can not commit").await; + assert!(res.is_err()); + } + + let blank_log_index = log_index + 1; + + tracing::info!("--- get_read_log_id returns blank log id"); + { + let (read_log_id, applied) = n1.get_read_log_id().await?; + assert_eq!( + read_log_id.index(), + Some(blank_log_index), + "read-log-id is the blank log" + ); + assert_eq!(applied.index(), Some(log_index)); + } + + tracing::info!("--- stop blocking, write another log, get_read_log_id returns last log id"); + { + router.rpc_pre_hook(RPCTypes::AppendEntries, None); + + n1.wait(timeout()).applied_index(Some(log_index + 1), "commit blank log").await?; + log_index += 1; + + log_index += router.client_request_many(1, "foo", 1).await?; + + let (read_log_id, applied) = n1.get_read_log_id().await?; + assert_eq!(read_log_id.index(), Some(log_index), "read-log-id is the committed log"); + assert_eq!(applied.index(), Some(log_index)); + } + + let last_committed = log_index; + + tracing::info!( + "--- block append again, write 1 log that wont commit, get_read_log_id returns last committed log id" + ); + { + router.set_rpc_pre_hook(RPCTypes::AppendEntries, block_to_n0); + + let r = router.clone(); + tokio::spawn(async move { + // This will block for ever + let _x = r.client_request_many(1, "foo", 1).await; + }); + + log_index += 1; + n1.wait(timeout()).log_index(Some(log_index), "log appended, but not committed").await?; + + let (read_log_id, _applied) = n1.get_read_log_id().await?; + assert_eq!( + read_log_id.index(), + Some(last_committed), + "read-log-id is the committed log" + ); + } + + Ok(()) +} + +fn timeout() -> Option { + Some(Duration::from_millis(200)) +} diff --git a/tests/tests/fixtures/mod.rs b/tests/tests/fixtures/mod.rs index e6ebcf041..092a7f4db 100644 --- a/tests/tests/fixtures/mod.rs +++ b/tests/tests/fixtures/mod.rs @@ -716,13 +716,11 @@ impl TypedRaftRouter { node.add_learner(target, (), true).await.map_err(|e| e.into_api_error().unwrap()) } - /// Send a is_leader request to the target node. - pub async fn is_leader(&self, target: MemNodeId) -> Result<(), CheckIsLeaderError> { - let node = { - let rt = self.nodes.lock().unwrap(); - rt.get(&target).unwrap_or_else(|| panic!("node with ID {} does not exist", target)).clone() - }; - node.0.is_leader().await.map_err(|e| e.into_api_error().unwrap()) + /// Ensure read linearizability. + pub async fn ensure_linearizable(&self, target: MemNodeId) -> Result<(), CheckIsLeaderError> { + let n = self.get_raft_handle(&target).unwrap(); + n.ensure_linearizable().await.map_err(|e| e.into_api_error().unwrap())?; + Ok(()) } /// Send a client request to the target node, causing test failure on error. diff --git a/tests/tests/membership/t10_single_node.rs b/tests/tests/membership/t10_single_node.rs index e40bfbc8b..3f806c9b3 100644 --- a/tests/tests/membership/t10_single_node.rs +++ b/tests/tests/membership/t10_single_node.rs @@ -48,7 +48,7 @@ async fn single_node() -> Result<()> { .await?; // Read some data from the single node cluster. - router.is_leader(0).await?; + router.ensure_linearizable(0).await?; Ok(()) }