diff --git a/cluster_benchmark/tests/benchmark/bench_cluster.rs b/cluster_benchmark/tests/benchmark/bench_cluster.rs index 859df9a61..f8d7653cd 100644 --- a/cluster_benchmark/tests/benchmark/bench_cluster.rs +++ b/cluster_benchmark/tests/benchmark/bench_cluster.rs @@ -122,7 +122,7 @@ async fn do_bench(bench_config: &BenchConfig) -> anyhow::Result<()> { handles.push(h) } - leader.wait(timeout()).log_at_least(Some(total), "commit all written logs").await?; + leader.wait(timeout()).applied_index_at_least(Some(total), "commit all written logs").await?; let elapsed = now.elapsed(); diff --git a/cluster_benchmark/tests/benchmark/network.rs b/cluster_benchmark/tests/benchmark/network.rs index 5ba0298d5..c4ab046d6 100644 --- a/cluster_benchmark/tests/benchmark/network.rs +++ b/cluster_benchmark/tests/benchmark/network.rs @@ -73,7 +73,7 @@ impl Router { for (id, s) in rafts.iter_mut() { tracing::info!(log_index, "--- wait init log: {}, index: {}", id, log_index); - s.wait(timeout()).log(Some(log_index), "init").await?; + s.wait(timeout()).applied_index(Some(log_index), "init").await?; } Ok(()) diff --git a/macros/src/lib.rs b/macros/src/lib.rs index 5c3714288..217c839e0 100644 --- a/macros/src/lib.rs +++ b/macros/src/lib.rs @@ -1,6 +1,6 @@ use proc_macro::TokenStream; -/// This macro either emits `#[async_trait::async_trait]` or `#[asnc_trait::async_trait(?Send)]` +/// This macro either emits `#[async_trait::async_trait]` or `#[async_trait::async_trait(?Send)]` /// based on the activated feature set. /// /// This assumes that the `[async_trait](https://crates.io/crates/async-trait)` crate is imported diff --git a/openraft/src/metrics/wait.rs b/openraft/src/metrics/wait.rs index 448df6609..f7c5ea23c 100644 --- a/openraft/src/metrics/wait.rs +++ b/openraft/src/metrics/wait.rs @@ -128,6 +128,7 @@ where } /// Wait until applied exactly `want_log`(inclusive) logs or timeout. + #[deprecated(note = "use `log_index()` and `applied_index()` instead, deprecated since 0.9.0")] #[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))] pub async fn log(&self, want_log_index: Option, msg: impl ToString) -> Result, WaitError> { self.metrics( @@ -144,6 +145,7 @@ where } /// Wait until applied at least `want_log`(inclusive) logs or timeout. + #[deprecated(note = "use `log_index_at_least()` and `applied_index_at_least()` instead, deprecated since 0.9.0")] #[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))] pub async fn log_at_least( &self, @@ -163,6 +165,59 @@ where .await } + /// Block until the last log index becomes exactly `index`(inclusive) or timeout. + #[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))] + pub async fn log_index(&self, index: Option, msg: impl ToString) -> Result, WaitError> { + self.metrics( + |x| x.last_log_index == index, + &format!("{} .last_log_index == {:?}", msg.to_string(), index), + ) + .await + } + + /// Block until the last log index becomes at least `index`(inclusive) or timeout. + #[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))] + pub async fn log_index_at_least( + &self, + index: Option, + msg: impl ToString, + ) -> Result, WaitError> { + self.metrics( + |x| x.last_log_index >= index, + &format!("{} .last_log_index >= {:?}", msg.to_string(), index), + ) + .await + } + + /// Block until the applied index becomes exactly `index`(inclusive) or timeout. + #[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))] + pub async fn applied_index( + &self, + index: Option, + msg: impl ToString, + ) -> Result, WaitError> { + self.metrics( + |x| x.last_applied.index() == index, + &format!("{} .last_applied.index == {:?}", msg.to_string(), index), + ) + .await + } + + /// Block until the last applied log index become at least `index`(inclusive) or timeout. + /// Note that this also implies `last_log_id >= index`. + #[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))] + pub async fn applied_index_at_least( + &self, + index: Option, + msg: impl ToString, + ) -> Result, WaitError> { + self.metrics( + |m| m.last_log_index >= index && m.last_applied.index() >= index, + &format!("{} .last_applied.index >= {:?}", msg.to_string(), index), + ) + .await + } + /// Wait for `state` to become `want_state` or timeout. #[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))] pub async fn state(&self, want_state: ServerState, msg: impl ToString) -> Result, WaitError> { diff --git a/openraft/src/metrics/wait_test.rs b/openraft/src/metrics/wait_test.rs index af8e8ec95..233d3a392 100644 --- a/openraft/src/metrics/wait_test.rs +++ b/openraft/src/metrics/wait_test.rs @@ -41,7 +41,7 @@ async fn test_wait() -> anyhow::Result<()> { } { - // wait for log + // wait for applied log let (init, w, tx) = init_wait_test::(); let h = tokio::spawn(async move { @@ -52,10 +52,10 @@ async fn test_wait() -> anyhow::Result<()> { let rst = tx.send(update); assert!(rst.is_ok()); }); - let got = w.log(Some(3), "log").await?; - let got_least2 = w.log_at_least(Some(2), "log").await?; - let got_least3 = w.log_at_least(Some(3), "log").await?; - let got_least4 = w.log_at_least(Some(4), "log").await; + let got = w.applied_index(Some(3), "log").await?; + let got_least2 = w.applied_index_at_least(Some(2), "log").await?; + let got_least3 = w.applied_index_at_least(Some(3), "log").await?; + let got_least4 = w.applied_index_at_least(Some(4), "log").await; h.await?; assert_eq!(Some(3), got.last_log_index); @@ -173,6 +173,34 @@ async fn test_wait() -> anyhow::Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 8)] +async fn test_wait_log_index() -> anyhow::Result<()> { + // wait for applied log + let (init, w, tx) = init_wait_test::(); + + let h = tokio::spawn(async move { + sleep(Duration::from_millis(10)).await; + let mut update = init.clone(); + update.last_log_index = Some(3); + let rst = tx.send(update); + assert!(rst.is_ok()); + }); + + let got = w.log_index(Some(3), "log").await?; + let got_least2 = w.log_index_at_least(Some(2), "log").await?; + let got_least3 = w.log_index_at_least(Some(3), "log").await?; + let got_least4 = w.log_index_at_least(Some(4), "log").await; + h.await?; + + assert_eq!(Some(3), got.last_log_index); + assert_eq!(Some(3), got_least2.last_log_index); + assert_eq!(Some(3), got_least3.last_log_index); + + assert!(got_least4.is_err()); + + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 8)] async fn test_wait_vote() -> anyhow::Result<()> { let (init, w, tx) = init_wait_test::(); diff --git a/tests/tests/append_entries/t11_append_inconsistent_log.rs b/tests/tests/append_entries/t11_append_inconsistent_log.rs index 5efab79c9..3043c0b16 100644 --- a/tests/tests/append_entries/t11_append_inconsistent_log.rs +++ b/tests/tests/append_entries/t11_append_inconsistent_log.rs @@ -109,7 +109,7 @@ async fn append_inconsistent_log() -> Result<()> { router .wait(&0, Some(Duration::from_millis(2000))) // leader appends at least one blank log. There may be more than one transient leaders - .log_at_least(Some(log_index), "sync log to node 0") + .applied_index_at_least(Some(log_index), "sync log to node 0") .await?; let logs = sto0.get_log_entries(60..=60).await?; diff --git a/tests/tests/append_entries/t60_enable_heartbeat.rs b/tests/tests/append_entries/t60_enable_heartbeat.rs index e57fe2cf9..319661300 100644 --- a/tests/tests/append_entries/t60_enable_heartbeat.rs +++ b/tests/tests/append_entries/t60_enable_heartbeat.rs @@ -37,7 +37,7 @@ async fn enable_heartbeat() -> Result<()> { // no new log will be sent, . router .wait(&node_id, timeout()) - .log_at_least(Some(log_index), format!("node {} emit heartbeat log", node_id)) + .applied_index_at_least(Some(log_index), format!("node {} emit heartbeat log", node_id)) .await?; // leader lease is extended. diff --git a/tests/tests/append_entries/t61_heartbeat_reject_vote.rs b/tests/tests/append_entries/t61_heartbeat_reject_vote.rs index 6e6121320..577fc310c 100644 --- a/tests/tests/append_entries/t61_heartbeat_reject_vote.rs +++ b/tests/tests/append_entries/t61_heartbeat_reject_vote.rs @@ -69,7 +69,7 @@ async fn heartbeat_reject_vote() -> Result<()> { { // TODO: this part can be removed when blank-log heartbeat is removed. sleep(Duration::from_millis(1500)).await; - router.wait(&1, timeout()).log(Some(log_index), "no log is written").await?; + router.wait(&1, timeout()).applied_index(Some(log_index), "no log is written").await?; } tracing::info!(log_index, "--- disable heartbeat, vote request will be granted"); @@ -77,7 +77,7 @@ async fn heartbeat_reject_vote() -> Result<()> { node0.runtime_config().heartbeat(false); sleep(Duration::from_millis(1500)).await; - router.wait(&1, timeout()).log(Some(log_index), "no log is written").await?; + router.wait(&1, timeout()).applied_index(Some(log_index), "no log is written").await?; let res = node1.vote(VoteRequest::new(Vote::new(10, 2), Some(log_id(10, 1, 10)))).await?; assert!(res.vote_granted, "vote is granted after leader lease expired"); diff --git a/tests/tests/append_entries/t61_large_heartbeat.rs b/tests/tests/append_entries/t61_large_heartbeat.rs index 6ba126dc4..40c373e28 100644 --- a/tests/tests/append_entries/t61_large_heartbeat.rs +++ b/tests/tests/append_entries/t61_large_heartbeat.rs @@ -30,7 +30,7 @@ async fn large_heartbeat() -> Result<()> { router.client_request_many(0, "foo", 10).await?; log_index += 10; - router.wait(&1, Some(Duration::from_millis(3_000))).log(Some(log_index), "").await?; + router.wait(&1, Some(Duration::from_millis(3_000))).applied_index(Some(log_index), "").await?; Ok(()) } diff --git a/tests/tests/append_entries/t90_issue_216_stale_last_log_id.rs b/tests/tests/append_entries/t90_issue_216_stale_last_log_id.rs index 13ea8bdbb..c4d68a4b2 100644 --- a/tests/tests/append_entries/t90_issue_216_stale_last_log_id.rs +++ b/tests/tests/append_entries/t90_issue_216_stale_last_log_id.rs @@ -57,10 +57,10 @@ async fn stale_last_log_id() -> Result<()> { log_index += n_ops as u64; } - router.wait(&1, Some(Duration::from_millis(1000))).log(Some(log_index), "").await?; - router.wait(&2, Some(Duration::from_millis(1000))).log(Some(log_index), "").await?; - router.wait(&3, Some(Duration::from_millis(1000))).log(Some(log_index), "").await?; - router.wait(&4, Some(Duration::from_millis(1000))).log(Some(log_index), "").await?; + router.wait(&1, Some(Duration::from_millis(1000))).applied_index(Some(log_index), "").await?; + router.wait(&2, Some(Duration::from_millis(1000))).applied_index(Some(log_index), "").await?; + router.wait(&3, Some(Duration::from_millis(1000))).applied_index(Some(log_index), "").await?; + router.wait(&4, Some(Duration::from_millis(1000))).applied_index(Some(log_index), "").await?; Ok(()) } diff --git a/tests/tests/client_api/t12_trigger_purge_log.rs b/tests/tests/client_api/t12_trigger_purge_log.rs index 0efed021f..18a1aa5e8 100644 --- a/tests/tests/client_api/t12_trigger_purge_log.rs +++ b/tests/tests/client_api/t12_trigger_purge_log.rs @@ -34,7 +34,10 @@ async fn trigger_purge_log() -> anyhow::Result<()> { log_index += router.client_request_many(0, "0", 10).await?; for id in [0, 1, 2] { - router.wait(&id, timeout()).log(Some(log_index), format_args!("node-{} write logs", id)).await?; + router + .wait(&id, timeout()) + .applied_index(Some(log_index), format_args!("node-{} write logs", id)) + .await?; } } @@ -53,7 +56,10 @@ async fn trigger_purge_log() -> anyhow::Result<()> { log_index += router.client_request_many(0, "0", 10).await?; for id in [0, 1, 2] { - router.wait(&id, timeout()).log(Some(log_index), format_args!("node-{} write logs", id)).await?; + router + .wait(&id, timeout()) + .applied_index(Some(log_index), format_args!("node-{} write logs", id)) + .await?; } } diff --git a/tests/tests/client_api/t13_trigger_snapshot.rs b/tests/tests/client_api/t13_trigger_snapshot.rs index 7670783f9..d03918799 100644 --- a/tests/tests/client_api/t13_trigger_snapshot.rs +++ b/tests/tests/client_api/t13_trigger_snapshot.rs @@ -41,8 +41,8 @@ async fn trigger_snapshot() -> anyhow::Result<()> { router.client_request_many(0, "0", 10).await?; log_index += 10; - router.wait(&0, timeout()).log(Some(log_index), "node-0 write logs").await?; - router.wait(&1, timeout()).log(Some(log_index), "node-1 write logs").await?; + router.wait(&0, timeout()).applied_index(Some(log_index), "node-0 write logs").await?; + router.wait(&1, timeout()).applied_index(Some(log_index), "node-1 write logs").await?; } tracing::info!(log_index, "--- trigger snapshot for node-0"); diff --git a/tests/tests/fixtures/mod.rs b/tests/tests/fixtures/mod.rs index 08953caa1..e6ebcf041 100644 --- a/tests/tests/fixtures/mod.rs +++ b/tests/tests/fixtures/mod.rs @@ -641,7 +641,7 @@ impl TypedRaftRouter { msg: &str, ) -> anyhow::Result<()> { for i in node_ids.iter() { - self.wait(i, timeout).log(want_log, msg).await?; + self.wait(i, timeout).applied_index(want_log, msg).await?; } Ok(()) } diff --git a/tests/tests/life_cycle/t10_initialization.rs b/tests/tests/life_cycle/t10_initialization.rs index ae3f2fa68..d6ec4c2ad 100644 --- a/tests/tests/life_cycle/t10_initialization.rs +++ b/tests/tests/life_cycle/t10_initialization.rs @@ -81,7 +81,7 @@ async fn initialization() -> anyhow::Result<()> { log_index += 1; for node_id in [0, 1, 2] { - router.wait(&node_id, timeout()).log(Some(log_index), "init").await?; + router.wait(&node_id, timeout()).applied_index(Some(log_index), "init").await?; } } diff --git a/tests/tests/life_cycle/t50_single_follower_restart.rs b/tests/tests/life_cycle/t50_single_follower_restart.rs index eba209957..3df96be65 100644 --- a/tests/tests/life_cycle/t50_single_follower_restart.rs +++ b/tests/tests/life_cycle/t50_single_follower_restart.rs @@ -63,7 +63,7 @@ async fn single_follower_restart() -> anyhow::Result<()> { router.client_request_many(0, "foo", 1).await?; log_index += 1; - router.wait(&0, timeout()).log(Some(log_index), "node-0 works").await?; + router.wait(&0, timeout()).applied_index(Some(log_index), "node-0 works").await?; } Ok(()) diff --git a/tests/tests/life_cycle/t50_single_leader_restart_re_apply_logs.rs b/tests/tests/life_cycle/t50_single_leader_restart_re_apply_logs.rs index 8df778f76..0f3666dcb 100644 --- a/tests/tests/life_cycle/t50_single_leader_restart_re_apply_logs.rs +++ b/tests/tests/life_cycle/t50_single_leader_restart_re_apply_logs.rs @@ -49,7 +49,7 @@ async fn single_leader_restart_re_apply_logs() -> anyhow::Result<()> { tracing::info!(log_index, "--- a single leader should re-apply all logs"); { - router.wait(&0, timeout()).log(Some(log_index), "node-0 works").await?; + router.wait(&0, timeout()).applied_index(Some(log_index), "node-0 works").await?; } Ok(()) diff --git a/tests/tests/life_cycle/t90_issue_607_single_restart.rs b/tests/tests/life_cycle/t90_issue_607_single_restart.rs index cb0e6d2a0..1bffa0c9d 100644 --- a/tests/tests/life_cycle/t90_issue_607_single_restart.rs +++ b/tests/tests/life_cycle/t90_issue_607_single_restart.rs @@ -48,7 +48,7 @@ async fn single_restart() -> anyhow::Result<()> { router.client_request_many(0, "foo", 1).await?; log_index += 1; - router.wait(&0, timeout()).log(Some(log_index), "node-0 works").await?; + router.wait(&0, timeout()).applied_index(Some(log_index), "node-0 works").await?; } Ok(()) diff --git a/tests/tests/log_store/t10_save_committed.rs b/tests/tests/log_store/t10_save_committed.rs index 415896d05..899cee80b 100644 --- a/tests/tests/log_store/t10_save_committed.rs +++ b/tests/tests/log_store/t10_save_committed.rs @@ -29,7 +29,7 @@ async fn write_committed_log_id_to_log_store() -> Result<()> { log_index += router.client_request_many(0, "0", 10).await?; for i in [0, 1, 2] { - router.wait(&i, timeout()).log(Some(log_index), "write logs").await?; + router.wait(&i, timeout()).applied_index(Some(log_index), "write logs").await?; } for id in [0, 1, 2] { diff --git a/tests/tests/membership/t10_learner_restart.rs b/tests/tests/membership/t10_learner_restart.rs index f14dfa2af..200121e2d 100644 --- a/tests/tests/membership/t10_learner_restart.rs +++ b/tests/tests/membership/t10_learner_restart.rs @@ -49,7 +49,7 @@ async fn learner_restart() -> Result<()> { // restart node-1, assert the state as expected. let restarted = Raft::new(1, config.clone(), router.clone(), sto1, sm1).await?; - restarted.wait(timeout()).log(Some(log_index), "log after restart").await?; + restarted.wait(timeout()).applied_index(Some(log_index), "log after restart").await?; restarted.wait(timeout()).state(ServerState::Learner, "server state after restart").await?; Ok(()) diff --git a/tests/tests/membership/t11_add_learner.rs b/tests/tests/membership/t11_add_learner.rs index c57f19f3d..c60f2deb7 100644 --- a/tests/tests/membership/t11_add_learner.rs +++ b/tests/tests/membership/t11_add_learner.rs @@ -46,7 +46,7 @@ async fn add_learner_basic() -> Result<()> { log_index += 1; assert_eq!(log_index, res.log_id.index); - router.wait(&0, timeout()).log(Some(log_index), "commit re-adding leader log").await?; + router.wait(&0, timeout()).applied_index(Some(log_index), "commit re-adding leader log").await?; } tracing::info!(log_index, "--- add new node node-1"); @@ -86,7 +86,7 @@ async fn add_learner_basic() -> Result<()> { log_index += 1; assert_eq!(log_index, res.log_id.index); - router.wait(&0, timeout()).log(Some(log_index), "commit re-adding node-1 log").await?; + router.wait(&0, timeout()).applied_index(Some(log_index), "commit re-adding node-1 log").await?; let metrics = router.get_raft_handle(&0)?.metrics().borrow().clone(); let node_ids = metrics.membership_config.membership().nodes().map(|x| *x.0).collect::>(); @@ -122,7 +122,7 @@ async fn add_learner_non_blocking() -> Result<()> { router.client_request_many(0, "learner_add", 100 - log_index as usize).await?; log_index = 100; - router.wait(&0, timeout()).log(Some(log_index), "received 100 logs").await?; + router.wait(&0, timeout()).applied_index(Some(log_index), "received 100 logs").await?; router.new_raft_node(1).await; @@ -279,7 +279,7 @@ async fn check_learner_after_leader_transferred() -> Result<()> { { router .wait(&orig_leader_id, timeout()) - .log(Some(log_index), "old leader commits 2 membership log") + .applied_index(Some(log_index), "old leader commits 2 membership log") .await?; } @@ -291,7 +291,7 @@ async fn check_learner_after_leader_transferred() -> Result<()> { for id in [1, 3, 4] { router .wait(&id, timeout()) - .log_at_least( + .applied_index_at_least( Some(log_index), "node in new cluster finally commit at least one blank leader-initialize log", ) @@ -325,7 +325,7 @@ async fn check_learner_after_leader_transferred() -> Result<()> { log_index += 1; for i in [1, 2, 3, 4] { - router.wait(&i, timeout()).log_at_least(Some(log_index), "learner recv new log").await?; + router.wait(&i, timeout()).applied_index_at_least(Some(log_index), "learner recv new log").await?; } } diff --git a/tests/tests/membership/t20_change_membership.rs b/tests/tests/membership/t20_change_membership.rs index 577755f29..dab26ec8c 100644 --- a/tests/tests/membership/t20_change_membership.rs +++ b/tests/tests/membership/t20_change_membership.rs @@ -35,7 +35,10 @@ async fn update_membership_state() -> anyhow::Result<()> { tracing::info!(log_index, "--- change_membership blocks until success: {:?}", res); for node_id in [0, 1, 2, 3, 4] { - router.wait(&node_id, timeout()).log(Some(log_index), "change-membership log applied").await?; + router + .wait(&node_id, timeout()) + .applied_index(Some(log_index), "change-membership log applied") + .await?; router.external_request(node_id, move |st| { tracing::debug!("--- got state: {:?}", st); assert_eq!(st.membership_state.committed().log_id().index(), Some(log_index)); @@ -70,7 +73,7 @@ async fn change_with_new_learner_blocking() -> anyhow::Result<()> { router.client_request_many(0, "non_voter_add", 100 - log_index as usize).await?; log_index = 100; - router.wait(&0, timeout()).log(Some(log_index), "received 100 logs").await?; + router.wait(&0, timeout()).applied_index(Some(log_index), "received 100 logs").await?; } tracing::info!(log_index, "--- change membership without adding-learner"); @@ -103,7 +106,7 @@ async fn change_without_adding_learner() -> anyhow::Result<()> { let mut router = RaftRouter::new(config.clone()); let log_index = router.new_cluster(btreeset! {0}, btreeset! {}).await?; - router.wait(&0, timeout()).log(Some(log_index), "received 100 logs").await?; + router.wait(&0, timeout()).applied_index(Some(log_index), "received 100 logs").await?; router.new_raft_node(1).await; let leader = router.get_raft_handle(&0)?; diff --git a/tests/tests/membership/t21_change_membership_cases.rs b/tests/tests/membership/t21_change_membership_cases.rs index e568a9610..eb3599fb2 100644 --- a/tests/tests/membership/t21_change_membership_cases.rs +++ b/tests/tests/membership/t21_change_membership_cases.rs @@ -167,7 +167,10 @@ async fn change_from_to(old: BTreeSet, change_members: BTreeSet, change_members: BTreeSet, change_members: BTreeSet, add: &[MemNodeId]) -> anyhow::R { log_index += router.client_request_many(0, "client", 10).await?; for id in old.iter() { - router.wait(id, timeout()).log(Some(log_index), format!("write 10 logs, {}", mes)).await?; + router.wait(id, timeout()).applied_index(Some(log_index), format!("write 10 logs, {}", mes)).await?; } } @@ -278,7 +281,7 @@ async fn change_by_add(old: BTreeSet, add: &[MemNodeId]) -> anyhow::R router.new_raft_node(*id).await; router.add_learner(0, *id).await?; log_index += 1; - router.wait(id, timeout()).log(Some(log_index), format!("add learner, {}", mes)).await?; + router.wait(id, timeout()).applied_index(Some(log_index), format!("add learner, {}", mes)).await?; } } @@ -292,7 +295,10 @@ async fn change_by_add(old: BTreeSet, add: &[MemNodeId]) -> anyhow::R } for id in new.iter() { - router.wait(id, timeout()).log_at_least(Some(log_index), format!("new cluster, {}", mes)).await?; + router + .wait(id, timeout()) + .applied_index_at_least(Some(log_index), format!("new cluster, {}", mes)) + .await?; } } @@ -303,7 +309,7 @@ async fn change_by_add(old: BTreeSet, add: &[MemNodeId]) -> anyhow::R let mes = format!("new cluster recv logs 10~20, {}", mes); for id in new.iter() { - router.wait(id, timeout()).log_at_least(Some(log_index), &mes).await?; + router.wait(id, timeout()).applied_index_at_least(Some(log_index), &mes).await?; } } @@ -335,7 +341,7 @@ async fn change_by_remove(old: BTreeSet, remove: &[MemNodeId]) -> any { log_index += router.client_request_many(0, "client", 10).await?; for id in old.iter() { - router.wait(id, timeout()).log(Some(log_index), format!("write 10 logs, {}", mes)).await?; + router.wait(id, timeout()).applied_index(Some(log_index), format!("write 10 logs, {}", mes)).await?; } } @@ -373,7 +379,10 @@ async fn change_by_remove(old: BTreeSet, remove: &[MemNodeId]) -> any let new_leader = router.leader().expect("expected the cluster to have a leader"); for id in new.iter() { // new leader may already elected and committed a blank log. - router.wait(id, timeout()).log_at_least(Some(log_index), format!("new cluster, {}", mes)).await?; + router + .wait(id, timeout()) + .applied_index_at_least(Some(log_index), format!("new cluster, {}", mes)) + .await?; if new_leader != orig_leader { router @@ -421,7 +430,7 @@ async fn change_by_remove(old: BTreeSet, remove: &[MemNodeId]) -> any router .wait(id, timeout()) // new leader may commit a blank log - .log_at_least(Some(log_index), format!("new cluster recv logs 10~20, {}", mes)) + .applied_index_at_least(Some(log_index), format!("new cluster recv logs 10~20, {}", mes)) .await?; } @@ -430,7 +439,7 @@ async fn change_by_remove(old: BTreeSet, remove: &[MemNodeId]) -> any for id in only_in_old { let res = router .wait(id, timeout()) - .log( + .applied_index( Some(log_index), format!("node {} in old cluster wont recv new logs, {}", id, mes), ) diff --git a/tests/tests/membership/t30_elect_with_new_config.rs b/tests/tests/membership/t30_elect_with_new_config.rs index 3d3c99d08..c4cf42c11 100644 --- a/tests/tests/membership/t30_elect_with_new_config.rs +++ b/tests/tests/membership/t30_elect_with_new_config.rs @@ -55,7 +55,7 @@ async fn leader_election_after_changing_0_to_01234() -> Result<()> { for node_id in [1, 2, 3, 4] { router .wait(&node_id, timeout()) - .log(Some(log_index), "replicate and apply log to every node") + .applied_index(Some(log_index), "replicate and apply log to every node") .await?; } diff --git a/tests/tests/membership/t31_remove_leader.rs b/tests/tests/membership/t31_remove_leader.rs index 123cc76d9..e7249b507 100644 --- a/tests/tests/membership/t31_remove_leader.rs +++ b/tests/tests/membership/t31_remove_leader.rs @@ -50,7 +50,7 @@ async fn remove_leader() -> Result<()> { { router .wait(&orig_leader, timeout()) - .log(Some(log_index), "old leader commits 2 membership log") + .applied_index(Some(log_index), "old leader commits 2 membership log") .await?; } @@ -59,7 +59,7 @@ async fn remove_leader() -> Result<()> { router .wait(&1, timeout()) - .log_at_least(Some(log_index), "node in old cluster commits at least 1 membership log") + .applied_index_at_least(Some(log_index), "node in old cluster commits at least 1 membership log") .await?; tracing::info!(log_index, "--- new cluster commits 2 membership logs"); @@ -71,7 +71,7 @@ async fn remove_leader() -> Result<()> { for id in [2, 3] { router .wait(&id, timeout()) - .log_at_least( + .applied_index_at_least( Some(log_index), "node in new cluster finally commit at least one blank leader-initialize log", ) @@ -141,7 +141,7 @@ async fn remove_leader_and_convert_to_learner() -> Result<()> { { router .wait(&old_leader, timeout()) - .log(Some(log_index), "old leader commits 2 membership log") + .applied_index(Some(log_index), "old leader commits 2 membership log") .await?; } @@ -150,7 +150,7 @@ async fn remove_leader_and_convert_to_learner() -> Result<()> { router .wait(&1, timeout()) - .log_at_least( + .applied_index_at_least( Some(log_index - 1), "node in old cluster commits at least 1 membership log", ) @@ -215,7 +215,7 @@ async fn remove_leader_access_new_cluster() -> Result<()> { { router .wait(&orig_leader, timeout()) - .log(Some(log_index), "old leader commits 2 membership log") + .applied_index(Some(log_index), "old leader commits 2 membership log") .await?; } @@ -245,7 +245,9 @@ async fn remove_leader_access_new_cluster() -> Result<()> { router.send_client_request(2, ClientRequest::make_request("foo", 1)).await?; log_index += 1; - n2.wait(timeout()).log(Some(log_index), "node-2 become leader and handle write request").await?; + n2.wait(timeout()) + .applied_index(Some(log_index), "node-2 become leader and handle write request") + .await?; } Ok(()) diff --git a/tests/tests/membership/t99_issue_584_replication_state_reverted.rs b/tests/tests/membership/t99_issue_584_replication_state_reverted.rs index 883c6dff3..d83d53ed5 100644 --- a/tests/tests/membership/t99_issue_584_replication_state_reverted.rs +++ b/tests/tests/membership/t99_issue_584_replication_state_reverted.rs @@ -34,7 +34,7 @@ async fn t99_issue_584_replication_state_reverted() -> Result<()> { router.client_request_many(0, "foo", (n - log_index) as usize).await?; log_index = n; - router.wait(&1, timeout()).log(Some(log_index), "replicate all logs to learner").await?; + router.wait(&1, timeout()).applied_index(Some(log_index), "replicate all logs to learner").await?; } tracing::info!( diff --git a/tests/tests/metrics/t40_metrics_wait.rs b/tests/tests/metrics/t40_metrics_wait.rs index 4ea8d7391..cc05548d1 100644 --- a/tests/tests/metrics/t40_metrics_wait.rs +++ b/tests/tests/metrics/t40_metrics_wait.rs @@ -43,7 +43,7 @@ async fn metrics_wait() -> Result<()> { tracing::info!("--- wait and timeout"); - let rst = router.wait(&0, timeout()).log(Some(2), "timeout waiting for log 2").await; + let rst = router.wait(&0, timeout()).applied_index(Some(2), "timeout waiting for log 2").await; match rst { Ok(_) => { diff --git a/tests/tests/replication/t10_append_entries_partial_success.rs b/tests/tests/replication/t10_append_entries_partial_success.rs index d470fbecd..6efc46f2a 100644 --- a/tests/tests/replication/t10_append_entries_partial_success.rs +++ b/tests/tests/replication/t10_append_entries_partial_success.rs @@ -39,14 +39,14 @@ async fn append_entries_partial_success() -> Result<()> { }); log_index += quota; - router.wait(&0, timeout()).log(Some(log_index), format!("{} writes", quota)).await?; + router.wait(&0, timeout()).applied_index(Some(log_index), format!("{} writes", quota)).await?; log_index += 1; tracing::info!(log_index, "--- can not send log at index {}", log_index,); let res = router .wait(&0, timeout()) - .log(Some(log_index), format!("log index {} is limited by quota", log_index)) + .applied_index(Some(log_index), format!("log index {} is limited by quota", log_index)) .await; assert!(res.is_err(), "log index {} is limited by quota", log_index); @@ -57,7 +57,7 @@ async fn append_entries_partial_success() -> Result<()> { router.set_append_entries_quota(Some(1)); router .wait(&0, timeout()) - .log(Some(log_index), format!("log index {} can be replicated", log_index)) + .applied_index(Some(log_index), format!("log index {} can be replicated", log_index)) .await?; } diff --git a/tests/tests/replication/t50_append_entries_backoff.rs b/tests/tests/replication/t50_append_entries_backoff.rs index 6f76cf35e..5f6a4b503 100644 --- a/tests/tests/replication/t50_append_entries_backoff.rs +++ b/tests/tests/replication/t50_append_entries_backoff.rs @@ -49,7 +49,7 @@ async fn append_entries_backoff() -> Result<()> { router.client_request_many(0, "0", n as usize).await?; log_index += n; - router.wait(&0, timeout()).log(Some(log_index), format!("{} writes", n)).await?; + router.wait(&0, timeout()).applied_index(Some(log_index), format!("{} writes", n)).await?; } let counts1 = router.get_rpc_count(); diff --git a/tests/tests/replication/t50_append_entries_backoff_rejoin.rs b/tests/tests/replication/t50_append_entries_backoff_rejoin.rs index 720ba7e43..e5ee4f6b0 100644 --- a/tests/tests/replication/t50_append_entries_backoff_rejoin.rs +++ b/tests/tests/replication/t50_append_entries_backoff_rejoin.rs @@ -52,7 +52,9 @@ async fn append_entries_backoff_rejoin() -> Result<()> { tracing::info!(log_index, "--- write {} entries to node-1", n); { log_index += router.client_request_many(1, "1", n as usize).await?; - n1.wait(timeout()).log_at_least(Some(log_index), format!("node-1 commit {} writes", n)).await?; + n1.wait(timeout()) + .applied_index_at_least(Some(log_index), format!("node-1 commit {} writes", n)) + .await?; } tracing::info!(log_index, "--- restart node-0, check replication"); @@ -62,7 +64,7 @@ async fn append_entries_backoff_rejoin() -> Result<()> { router .wait(&0, timeout()) - .log_at_least(Some(log_index), format!("node-0 commit {} writes", n)) + .applied_index_at_least(Some(log_index), format!("node-0 commit {} writes", n)) .await?; } diff --git a/tests/tests/replication/t51_append_entries_too_large.rs b/tests/tests/replication/t51_append_entries_too_large.rs index 5fd58ffb9..8f4120dbb 100644 --- a/tests/tests/replication/t51_append_entries_too_large.rs +++ b/tests/tests/replication/t51_append_entries_too_large.rs @@ -37,7 +37,7 @@ async fn append_entries_too_large() -> Result<()> { tracing::info!(log_index, "--- write {} entries to leader", n); { log_index += router.client_request_many(0, "0", n as usize).await?; - router.wait(&0, timeout()).log(Some(log_index), format!("{} writes", n)).await?; + router.wait(&0, timeout()).applied_index(Some(log_index), format!("{} writes", n)).await?; } let count = Arc::new(AtomicU64::new(0)); @@ -69,7 +69,7 @@ async fn append_entries_too_large() -> Result<()> { router.add_learner(0, 1).await?; log_index += 1; - router.wait(&1, timeout()).log(Some(log_index), "1 node added").await?; + router.wait(&1, timeout()).applied_index(Some(log_index), "1 node added").await?; } assert_eq!( diff --git a/tests/tests/replication/t60_feature_loosen_follower_log_revert.rs b/tests/tests/replication/t60_feature_loosen_follower_log_revert.rs index ab1f8f110..259e27e6c 100644 --- a/tests/tests/replication/t60_feature_loosen_follower_log_revert.rs +++ b/tests/tests/replication/t60_feature_loosen_follower_log_revert.rs @@ -33,7 +33,7 @@ async fn feature_loosen_follower_log_revert() -> Result<()> { { log_index += router.client_request_many(0, "0", 10).await?; for i in [0, 1, 2, 3] { - router.wait(&i, timeout()).log(Some(log_index), format!("{} writes", 10)).await?; + router.wait(&i, timeout()).applied_index(Some(log_index), format!("{} writes", 10)).await?; } } @@ -53,7 +53,7 @@ async fn feature_loosen_follower_log_revert() -> Result<()> { { log_index += router.client_request_many(0, "0", 10).await?; for i in [0, 1, 2, 3] { - router.wait(&i, timeout()).log(Some(log_index), format!("{} writes", 10)).await?; + router.wait(&i, timeout()).applied_index(Some(log_index), format!("{} writes", 10)).await?; } } diff --git a/tests/tests/snapshot_building/t35_building_snapshot_does_not_block_append.rs b/tests/tests/snapshot_building/t35_building_snapshot_does_not_block_append.rs index 1d1f7054e..c96afa921 100644 --- a/tests/tests/snapshot_building/t35_building_snapshot_does_not_block_append.rs +++ b/tests/tests/snapshot_building/t35_building_snapshot_does_not_block_append.rs @@ -41,7 +41,7 @@ async fn building_snapshot_does_not_block_append() -> Result<()> { tracing::info!(log_index, "--- build snapshot on follower, it should block"); { log_index += router.client_request_many(0, "0", 10).await?; - router.wait(&1, timeout()).log(Some(log_index), "written 10 logs").await?; + router.wait(&1, timeout()).applied_index(Some(log_index), "written 10 logs").await?; follower.trigger().snapshot().await?; diff --git a/tests/tests/snapshot_building/t35_building_snapshot_does_not_block_apply.rs b/tests/tests/snapshot_building/t35_building_snapshot_does_not_block_apply.rs index 69500c9f1..7a9c3b43b 100644 --- a/tests/tests/snapshot_building/t35_building_snapshot_does_not_block_apply.rs +++ b/tests/tests/snapshot_building/t35_building_snapshot_does_not_block_apply.rs @@ -45,7 +45,7 @@ async fn building_snapshot_does_not_block_apply() -> Result<()> { tracing::info!(log_index, "--- build snapshot on follower, it should block"); { log_index += router.client_request_many(0, "0", 10).await?; - router.wait(&1, timeout()).log(Some(log_index), "written 10 logs").await?; + router.wait(&1, timeout()).applied_index(Some(log_index), "written 10 logs").await?; follower.trigger().snapshot().await?; @@ -84,7 +84,7 @@ async fn building_snapshot_does_not_block_apply() -> Result<()> { router .wait(&1, timeout()) - .log( + .applied_index( Some(next), format!("log at index {} can be applied, while snapshot is building", next), ) diff --git a/tests/tests/snapshot_building/t60_snapshot_policy_never.rs b/tests/tests/snapshot_building/t60_snapshot_policy_never.rs index 57394a6d5..34a51b195 100644 --- a/tests/tests/snapshot_building/t60_snapshot_policy_never.rs +++ b/tests/tests/snapshot_building/t60_snapshot_policy_never.rs @@ -64,7 +64,7 @@ async fn snapshot_policy_never() -> Result<()> { tracing::info!(log_index, "--- log_index: {}", log_index); router .wait(&0, timeout()) - .log(Some(log_index), format_args!("write log upto {}", log_index)) + .applied_index(Some(log_index), format_args!("write log upto {}", log_index)) .await?; let wait_snapshot_res = router diff --git a/tests/tests/snapshot_streaming/t20_startup_snapshot.rs b/tests/tests/snapshot_streaming/t20_startup_snapshot.rs index a6fbb98ef..7b928b24d 100644 --- a/tests/tests/snapshot_streaming/t20_startup_snapshot.rs +++ b/tests/tests/snapshot_streaming/t20_startup_snapshot.rs @@ -32,7 +32,7 @@ async fn startup_build_snapshot() -> anyhow::Result<()> { { log_index += router.client_request_many(0, "0", (20 - 1 - log_index) as usize).await?; - router.wait(&0, timeout()).log(Some(log_index), "node-0 applied all requests").await?; + router.wait(&0, timeout()).applied_index(Some(log_index), "node-0 applied all requests").await?; router.get_raft_handle(&0)?.trigger().snapshot().await?; router.wait(&0, timeout()).snapshot(log_id(1, 0, log_index), "node-0 snapshot").await?; } diff --git a/tests/tests/snapshot_streaming/t30_purge_in_snapshot_logs.rs b/tests/tests/snapshot_streaming/t30_purge_in_snapshot_logs.rs index 74d4bee49..35560b72f 100644 --- a/tests/tests/snapshot_streaming/t30_purge_in_snapshot_logs.rs +++ b/tests/tests/snapshot_streaming/t30_purge_in_snapshot_logs.rs @@ -64,7 +64,7 @@ async fn purge_in_snapshot_logs() -> Result<()> { router.set_network_error(1, true); log_index += router.client_request_many(0, "0", 5).await?; - router.wait(&0, timeout()).log(Some(log_index), "write another 5 logs").await?; + router.wait(&0, timeout()).applied_index(Some(log_index), "write another 5 logs").await?; leader.trigger().snapshot().await?; leader diff --git a/tests/tests/snapshot_streaming/t33_snapshot_delete_conflict_logs.rs b/tests/tests/snapshot_streaming/t33_snapshot_delete_conflict_logs.rs index 77aa5d9bb..3cca11f65 100644 --- a/tests/tests/snapshot_streaming/t33_snapshot_delete_conflict_logs.rs +++ b/tests/tests/snapshot_streaming/t33_snapshot_delete_conflict_logs.rs @@ -71,7 +71,7 @@ async fn snapshot_delete_conflicting_logs() -> Result<()> { router.new_raft_node_with_sto(0, sto0, sm0).await; router.wait(&0, timeout()).state(ServerState::Leader, "init node-0 server-state").await?; - router.wait(&0, timeout()).log(Some(log_index), "init node-0 log").await?; + router.wait(&0, timeout()).applied_index(Some(log_index), "init node-0 log").await?; } tracing::info!(log_index, "--- send just enough logs to trigger snapshot"); @@ -79,7 +79,7 @@ async fn snapshot_delete_conflicting_logs() -> Result<()> { router.client_request_many(0, "0", (snapshot_threshold - 1 - log_index) as usize).await?; log_index = snapshot_threshold - 1; - router.wait(&0, timeout()).log(Some(log_index), "trigger snapshot").await?; + router.wait(&0, timeout()).applied_index(Some(log_index), "trigger snapshot").await?; router .wait(&0, timeout()) .snapshot(LogId::new(CommittedLeaderId::new(5, 0), log_index), "build snapshot") diff --git a/tests/tests/snapshot_streaming/t90_issue_808_snapshot_to_unreachable_node_should_not_block.rs b/tests/tests/snapshot_streaming/t90_issue_808_snapshot_to_unreachable_node_should_not_block.rs index 082e394c6..bf39d7837 100644 --- a/tests/tests/snapshot_streaming/t90_issue_808_snapshot_to_unreachable_node_should_not_block.rs +++ b/tests/tests/snapshot_streaming/t90_issue_808_snapshot_to_unreachable_node_should_not_block.rs @@ -33,7 +33,7 @@ async fn snapshot_to_unreachable_node_should_not_block() -> Result<()> { tracing::info!(log_index, "--- write {} logs", n); { log_index += router.client_request_many(0, "0", n).await?; - router.wait(&0, timeout()).log(Some(log_index), format!("{} writes", n)).await?; + router.wait(&0, timeout()).applied_index(Some(log_index), format!("{} writes", n)).await?; } let n0 = router.get_raft_handle(&0)?;