Skip to content

Commit

Permalink
More robust smoke.rs
Browse files Browse the repository at this point in the history
Remove explicit loop from smoke.rs in favor of block stream listener.

  * add `fn subscribe_blocks` to client
  * add some fail cases to process-compose.yml
  • Loading branch information
tbro committed Nov 26, 2024
1 parent c9f9583 commit 09b9932
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 15 deletions.
12 changes: 12 additions & 0 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,18 @@ impl SequencerClient {
.context("subscribing to Espresso headers")
}

/// Subscribe to a stream of Block Headers
pub async fn subscribe_blocks(
&self,
height: u64,
) -> anyhow::Result<Connection<Header, Unsupported, ClientError, SequencerApiVersion>> {
self.0
.socket(&format!("availability/stream/blocks/{height}"))
.subscribe()
.await
.context("subscribing to Espresso Blocks")
}

/// Get the balance for a given account at a given block height, defaulting to current balance.
pub async fn get_espresso_balance(
&self,
Expand Down
8 changes: 6 additions & 2 deletions process-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ processes:
success_threshold: 1
failure_threshold: 100
availability:
restart: "exit_on_failure"
restart: exit_on_failure

state-relay-server:
command: state-relay-server
Expand Down Expand Up @@ -155,6 +155,7 @@ processes:
condition: process_completed
availability:
exit_on_skipped: true
restart: exit_on_failure

readiness_probe:
http_get:
Expand Down Expand Up @@ -216,7 +217,7 @@ processes:
path: /healthcheck
failure_threshold: 100
availability:
exit_on_skipped: true
restart: exit_on_failure

sequencer2:
command: sequencer -- http -- catchup -- status
Expand Down Expand Up @@ -263,6 +264,7 @@ processes:
failure_threshold: 100
availability:
exit_on_skipped: true
restart: exit_on_failure

sequencer3:
command: sequencer -- http -- catchup -- status
Expand Down Expand Up @@ -310,6 +312,7 @@ processes:
failure_threshold: 100
availability:
exit_on_skipped: true
restart: exit_on_failure

sequencer4:
command: sequencer -- http -- catchup -- status
Expand Down Expand Up @@ -353,6 +356,7 @@ processes:
failure_threshold: 100
availability:
exit_on_skipped: true
restart: exit_on_failure

node_validator:
command: RUST_LOG=debug node-metrics --
Expand Down
51 changes: 38 additions & 13 deletions tests/smoke.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
use crate::common::TestConfig;
use anyhow::Result;
use std::time::{Duration, Instant};
use tokio::time::sleep;
use futures::StreamExt;
use std::time::Instant;

/// We allow for no change in state across this many iterations.
const MAX_STATE_NOT_INCREMENTING: u8 = 1;
/// We allow for no new transactions across this many iterations.
const MAX_TXNS_NOT_INCREMENTING: u8 = 3;

#[tokio::test(flavor = "multi_thread")]
async fn test_smoke() -> Result<()> {
Expand All @@ -13,13 +18,18 @@ async fn test_smoke() -> Result<()> {
println!("Waiting on readiness");
let _ = testing.readiness().await?;

let mut initial = testing.test_state().await;
let initial = testing.test_state().await;
println!("Initial State:{}", initial);

let mut i = 1;
loop {
sleep(Duration::from_secs(1)).await;
let mut sub = testing
.espresso
.subscribe_blocks(initial.block_height.unwrap())
.await?;

let mut last = initial.clone();
let mut state_retries = 0;
let mut txn_retries = 0;
while (sub.next().await).is_some() {
let new = testing.test_state().await;
println!("New State:{}", new);

Expand All @@ -37,20 +47,35 @@ async fn test_smoke() -> Result<()> {
// test that we progress EXPECTED_BLOCK_HEIGHT blocks from where we started
if new.block_height.unwrap() >= testing.expected_block_height() + testing.initial_height {
println!("Reached {} block(s)!", testing.expected_block_height());
if new.txn_count - initial.txn_count < 1 {
panic!("Did not receive transactions");
}
break;
}

if i % 5 == 0 {
if new <= initial {
panic!("Chain state not incrementing");
if new <= last {
if state_retries > MAX_STATE_NOT_INCREMENTING {
panic!("Chain state did not increment.");
}
state_retries += 1;
println!("Chain state did not increment, trying again.");
} else {
// If state is incrementing reset the counter.
state_retries = 0;
}

if new.txn_count <= initial.txn_count {
panic!("Transactions not incrementing");
if new.txn_count <= last.txn_count {
if txn_retries >= MAX_TXNS_NOT_INCREMENTING {
panic!("No new transactions.");
}
initial = new;
txn_retries += 1;
println!("Transactions did not increment, trying again.");
} else {
// If transactions are incrementing reset the counter.
txn_retries = 0;
}
i += 1;

last = new;
}
Ok(())
}

0 comments on commit 09b9932

Please sign in to comment.