diff --git a/client/src/lib.rs b/client/src/lib.rs index 9f20f8f71..80b112ff5 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -56,6 +56,18 @@ impl SequencerClient { .context("subscribing to Espresso headers") } + /// Subscribe to a stream of Block Headers + pub async fn subscribe_blocks( + &self, + height: u64, + ) -> anyhow::Result> { + self.0 + .socket(&format!("availability/stream/blocks/{height}")) + .subscribe() + .await + .context("subscribing to Espresso Blocks") + } + /// Get the balance for a given account at a given block height, defaulting to current balance. pub async fn get_espresso_balance( &self, diff --git a/process-compose.yaml b/process-compose.yaml index bf0a95b44..825c13aa5 100644 --- a/process-compose.yaml +++ b/process-compose.yaml @@ -84,7 +84,7 @@ processes: success_threshold: 1 failure_threshold: 100 availability: - restart: "exit_on_failure" + restart: exit_on_failure state-relay-server: command: state-relay-server @@ -155,6 +155,7 @@ processes: condition: process_completed availability: exit_on_skipped: true + restart: exit_on_failure readiness_probe: http_get: @@ -216,7 +217,7 @@ processes: path: /healthcheck failure_threshold: 100 availability: - exit_on_skipped: true + restart: exit_on_failure sequencer2: command: sequencer -- http -- catchup -- status @@ -263,6 +264,7 @@ processes: failure_threshold: 100 availability: exit_on_skipped: true + restart: exit_on_failure sequencer3: command: sequencer -- http -- catchup -- status @@ -310,6 +312,7 @@ processes: failure_threshold: 100 availability: exit_on_skipped: true + restart: exit_on_failure sequencer4: command: sequencer -- http -- catchup -- status @@ -353,6 +356,7 @@ processes: failure_threshold: 100 availability: exit_on_skipped: true + restart: exit_on_failure node_validator: command: RUST_LOG=debug node-metrics -- diff --git a/tests/smoke.rs b/tests/smoke.rs index c16557421..ef8e78504 100644 --- a/tests/smoke.rs +++ b/tests/smoke.rs @@ -1,7 +1,12 @@ use crate::common::TestConfig; use anyhow::Result; -use std::time::{Duration, Instant}; -use tokio::time::sleep; +use futures::StreamExt; +use std::time::Instant; + +/// We allow for no change in state across this many iterations. +const MAX_STATE_NOT_INCREMENTING: u8 = 1; +/// We allow for no new transactions across this many iterations. +const MAX_TXNS_NOT_INCREMENTING: u8 = 3; #[tokio::test(flavor = "multi_thread")] async fn test_smoke() -> Result<()> { @@ -13,13 +18,18 @@ async fn test_smoke() -> Result<()> { println!("Waiting on readiness"); let _ = testing.readiness().await?; - let mut initial = testing.test_state().await; + let initial = testing.test_state().await; println!("Initial State:{}", initial); - let mut i = 1; - loop { - sleep(Duration::from_secs(1)).await; + let mut sub = testing + .espresso + .subscribe_blocks(initial.block_height.unwrap()) + .await?; + let mut last = initial.clone(); + let mut state_retries = 0; + let mut txn_retries = 0; + while (sub.next().await).is_some() { let new = testing.test_state().await; println!("New State:{}", new); @@ -37,20 +47,35 @@ async fn test_smoke() -> Result<()> { // test that we progress EXPECTED_BLOCK_HEIGHT blocks from where we started if new.block_height.unwrap() >= testing.expected_block_height() + testing.initial_height { println!("Reached {} block(s)!", testing.expected_block_height()); + if new.txn_count - initial.txn_count < 1 { + panic!("Did not receive transactions"); + } break; } - if i % 5 == 0 { - if new <= initial { - panic!("Chain state not incrementing"); + if new <= last { + if state_retries > MAX_STATE_NOT_INCREMENTING { + panic!("Chain state did not increment."); } + state_retries += 1; + println!("Chain state did not increment, trying again."); + } else { + // If state is incrementing reset the counter. + state_retries = 0; + } - if new.txn_count <= initial.txn_count { - panic!("Transactions not incrementing"); + if new.txn_count <= last.txn_count { + if txn_retries >= MAX_TXNS_NOT_INCREMENTING { + panic!("No new transactions."); } - initial = new; + txn_retries += 1; + println!("Transactions did not increment, trying again."); + } else { + // If transactions are incrementing reset the counter. + txn_retries = 0; } - i += 1; + + last = new; } Ok(()) }