diff --git a/offchain/advance-runner/src/broker.rs b/offchain/advance-runner/src/broker.rs index beb01bf4a..40e253b91 100644 --- a/offchain/advance-runner/src/broker.rs +++ b/offchain/advance-runner/src/broker.rs @@ -20,9 +20,6 @@ pub enum BrokerFacadeError { ))] InvalidIndexes { expected: u128, got: u128 }, - #[snafu(display("failed to consume input event"))] - ConsumeError { source: BrokerError }, - #[snafu(display( "failed to find finish epoch input event epoch={}", epoch @@ -73,13 +70,16 @@ impl BrokerFacade { }) } - /// Consume rollups input event + /// Read rollups input event #[tracing::instrument(level = "trace", skip_all)] - pub async fn consume_input(&mut self) -> Result { - tracing::trace!(self.last_id, "consuming rollups input event"); + pub async fn read_input(&mut self) -> Result { + tracing::trace!( + self.last_id, + "reading rollups input event from stream" + ); let event = self .client - .consume_blocking(&self.inputs_stream, &self.last_id) + .read_blocking(&self.inputs_stream, &self.last_id) .await .context(BrokerInternalSnafu)?; if event.payload.parent_id != self.last_id { @@ -93,9 +93,9 @@ impl BrokerFacade { } } - /// Produce the rollups claim if it isn't in the stream yet + /// Add the rollups claim on redis if it isn't in the stream yet #[tracing::instrument(level = "trace", skip_all)] - pub async fn produce_rollups_claim( + pub async fn add_rollups_claim( &mut self, rollups_claim: RollupsClaim, ) -> Result<()> { @@ -105,26 +105,26 @@ impl BrokerFacade { tracing::trace!(rollups_claim.epoch_index, ?rollups_claim.epoch_hash, - "producing rollups claim" + "adding rollups claim to stream" ); let last_claim_event = self .client - .peek_latest(&self.claims_stream) + .read_latest(&self.claims_stream) .await .context(BrokerInternalSnafu)?; - let should_enqueue_claim = match last_claim_event { + let should_add_claim = match last_claim_event { Some(event) => { let last_claim = event.payload; tracing::trace!(?last_claim, "got last claim from Redis"); - let should_enqueue_claim = + let should_add_claim = rollups_claim.epoch_index > last_claim.epoch_index; // If this happens, then something is wrong with the dispatcher. let invalid_indexes = rollups_claim.first_index != last_claim.last_index + 1; - if should_enqueue_claim && invalid_indexes { + if should_add_claim && invalid_indexes { tracing::debug!("rollups_claim.first_index = {}, last_claim.last_index = {}", rollups_claim.first_index, last_claim.last_index); return Err(BrokerFacadeError::InvalidIndexes { @@ -133,7 +133,7 @@ impl BrokerFacade { }); }; - should_enqueue_claim + should_add_claim } None => { tracing::trace!("no claims in the stream"); @@ -141,9 +141,9 @@ impl BrokerFacade { } }; - if should_enqueue_claim { + if should_add_claim { self.client - .produce(&self.claims_stream, rollups_claim) + .add(&self.claims_stream, rollups_claim) .await .context(BrokerInternalSnafu)?; } @@ -151,17 +151,17 @@ impl BrokerFacade { Ok(()) } - /// Produce outputs to the rollups-outputs stream + /// Add outputs to the rollups-outputs stream #[tracing::instrument(level = "trace", skip_all)] - pub async fn produce_outputs( + pub async fn add_outputs( &mut self, outputs: Vec, ) -> Result<()> { - tracing::trace!(?outputs, "producing rollups outputs"); + tracing::trace!(?outputs, "adding rollups outputs to stream"); for output in outputs { self.client - .produce(&self.outputs_stream, output) + .add(&self.outputs_stream, output) .await .context(BrokerInternalSnafu)?; } @@ -196,7 +196,7 @@ mod tests { }; let config = BrokerConfig { redis_endpoint: fixture.redis_endpoint().to_owned(), - consume_timeout: 10, + read_timeout: 10, backoff, }; let facade = BrokerFacade::new(config, dapp_metadata, false) @@ -207,7 +207,7 @@ mod tests { } #[test_log::test(tokio::test)] - async fn test_it_consumes_inputs() { + async fn test_it_reads_inputs() { let docker = Cli::default(); let mut state = TestState::setup(&docker).await; let inputs = vec![ @@ -233,10 +233,10 @@ mod tests { ]; let mut ids = Vec::new(); for input in inputs.iter() { - ids.push(state.fixture.produce_input_event(input.clone()).await); + ids.push(state.fixture.add_input_event(input.clone()).await); } assert_eq!( - state.facade.consume_input().await.unwrap(), + state.facade.read_input().await.unwrap(), RollupsInput { parent_id: INITIAL_ID.to_owned(), epoch_index: 0, @@ -245,7 +245,7 @@ mod tests { }, ); assert_eq!( - state.facade.consume_input().await.unwrap(), + state.facade.read_input().await.unwrap(), RollupsInput { parent_id: ids[0].clone(), epoch_index: 0, @@ -254,7 +254,7 @@ mod tests { }, ); assert_eq!( - state.facade.consume_input().await.unwrap(), + state.facade.read_input().await.unwrap(), RollupsInput { parent_id: ids[1].clone(), epoch_index: 1, @@ -265,7 +265,7 @@ mod tests { } #[test_log::test(tokio::test)] - async fn test_it_does_not_produce_claim_when_it_was_already_produced() { + async fn test_it_does_not_add_claim_when_it_was_already_added() { let docker = Cli::default(); let mut state = TestState::setup(&docker).await; let rollups_claim = RollupsClaim { @@ -275,23 +275,17 @@ mod tests { first_index: 0, last_index: 6, }; - state - .fixture - .produce_rollups_claim(rollups_claim.clone()) - .await; + state.fixture.add_rollups_claim(rollups_claim.clone()).await; state .facade - .produce_rollups_claim(rollups_claim.clone()) + .add_rollups_claim(rollups_claim.clone()) .await .unwrap(); - assert_eq!( - state.fixture.consume_all_claims().await, - vec![rollups_claim] - ); + assert_eq!(state.fixture.read_all_claims().await, vec![rollups_claim]); } #[test_log::test(tokio::test)] - async fn test_it_produces_claims() { + async fn test_it_adds_claims() { let docker = Cli::default(); let mut state = TestState::setup(&docker).await; let rollups_claim0 = RollupsClaim { @@ -310,16 +304,16 @@ mod tests { }; state .facade - .produce_rollups_claim(rollups_claim0.clone()) + .add_rollups_claim(rollups_claim0.clone()) .await .unwrap(); state .facade - .produce_rollups_claim(rollups_claim1.clone()) + .add_rollups_claim(rollups_claim1.clone()) .await .unwrap(); assert_eq!( - state.fixture.consume_all_claims().await, + state.fixture.read_all_claims().await, vec![rollups_claim0, rollups_claim1] ); } @@ -344,12 +338,10 @@ mod tests { }; state .fixture - .produce_rollups_claim(rollups_claim1.clone()) - .await; - let result = state - .facade - .produce_rollups_claim(rollups_claim2.clone()) + .add_rollups_claim(rollups_claim1.clone()) .await; + let result = + state.facade.add_rollups_claim(rollups_claim2.clone()).await; assert!(result.is_err()); assert_eq!( BrokerFacadeError::InvalidIndexes { @@ -381,12 +373,10 @@ mod tests { }; state .fixture - .produce_rollups_claim(rollups_claim1.clone()) - .await; - let result = state - .facade - .produce_rollups_claim(rollups_claim2.clone()) + .add_rollups_claim(rollups_claim1.clone()) .await; + let result = + state.facade.add_rollups_claim(rollups_claim2.clone()).await; assert!(result.is_err()); assert_eq!( BrokerFacadeError::InvalidIndexes { diff --git a/offchain/advance-runner/src/runner.rs b/offchain/advance-runner/src/runner.rs index ccd09479b..421f540b7 100644 --- a/offchain/advance-runner/src/runner.rs +++ b/offchain/advance-runner/src/runner.rs @@ -21,17 +21,17 @@ pub enum RunnerError { #[snafu(display("failed to find finish epoch input event"))] FindFinishEpochInputError { source: BrokerFacadeError }, - #[snafu(display("failed to consume input from broker"))] - ConsumeInputError { source: BrokerFacadeError }, + #[snafu(display("failed to read input from broker"))] + ReadInputError { source: BrokerFacadeError }, - #[snafu(display("failed to get whether claim was produced"))] + #[snafu(display("failed to get whether claim was added to stream"))] PeekClaimError { source: BrokerFacadeError }, - #[snafu(display("failed to produce claim in broker"))] - ProduceClaimError { source: BrokerFacadeError }, + #[snafu(display("failed to add claim to stream in broker"))] + AddClaimError { source: BrokerFacadeError }, - #[snafu(display("failed to produce outputs in broker"))] - ProduceOutputsError { source: BrokerFacadeError }, + #[snafu(display("failed to add outputs to stream in broker"))] + AddOutputsError { source: BrokerFacadeError }, } type Result = std::result::Result; @@ -54,12 +54,9 @@ impl Runner { tracing::info!("starting runner main loop"); loop { - let event = runner - .broker - .consume_input() - .await - .context(ConsumeInputSnafu)?; - tracing::info!(?event, "consumed input event"); + let event = + runner.broker.read_input().await.context(ReadInputSnafu)?; + tracing::info!(?event, "read input event"); match event.data { RollupsData::AdvanceStateInput(input) => { @@ -88,7 +85,7 @@ impl Runner { input_metadata: InputMetadata, input_payload: Vec, ) -> Result<()> { - tracing::trace!("handling advance state"); + tracing::info!("handling advance state"); let input_index = inputs_sent_count - 1; let outputs = self @@ -104,17 +101,17 @@ impl Runner { tracing::trace!("advance state sent to server-manager"); self.broker - .produce_outputs(outputs) + .add_outputs(outputs) .await - .context(ProduceOutputsSnafu)?; - tracing::trace!("produced outputs in broker"); + .context(AddOutputsSnafu)?; + tracing::trace!("added outputs to stream in broker"); Ok(()) } #[tracing::instrument(level = "trace", skip_all)] async fn handle_finish(&mut self, epoch_index: u64) -> Result<()> { - tracing::trace!("handling finish"); + tracing::info!("handling finish epoch"); let result = self.server_manager.finish_epoch(epoch_index).await; tracing::trace!("finished epoch in server-manager"); @@ -122,16 +119,16 @@ impl Runner { match result { Ok((rollups_claim, proofs)) => { self.broker - .produce_outputs(proofs) + .add_outputs(proofs) .await - .context(ProduceOutputsSnafu)?; - tracing::trace!("produced outputs in broker"); + .context(AddOutputsSnafu)?; + tracing::trace!("added outputs to broker stream"); self.broker - .produce_rollups_claim(rollups_claim) + .add_rollups_claim(rollups_claim) .await - .context(ProduceClaimSnafu)?; - tracing::info!("produced epoch claim"); + .context(AddClaimSnafu)?; + tracing::info!("added epoch claim to broker stream"); } Err(source) => { if let ServerManagerError::EmptyEpochError { .. } = source { diff --git a/offchain/advance-runner/tests/fixtures/mod.rs b/offchain/advance-runner/tests/fixtures/mod.rs index a5f68af10..5c65f0630 100644 --- a/offchain/advance-runner/tests/fixtures/mod.rs +++ b/offchain/advance-runner/tests/fixtures/mod.rs @@ -72,7 +72,7 @@ impl AdvanceRunnerFixture { let broker_config = BrokerConfig { redis_endpoint, - consume_timeout: 100, + read_timeout: 100, backoff: Default::default(), }; diff --git a/offchain/advance-runner/tests/host_integration.rs b/offchain/advance-runner/tests/host_integration.rs index 2604c3078..fca36d0e3 100644 --- a/offchain/advance-runner/tests/host_integration.rs +++ b/offchain/advance-runner/tests/host_integration.rs @@ -69,7 +69,7 @@ async fn send_inputs_to_server_manager() { let state = TestState::setup(&docker).await; const N: usize = 3; - tracing::info!("producing {} inputs", N); + tracing::info!("adding {} inputs", N); let payloads: Vec<_> = (0..N).map(|_| generate_payload()).collect(); for (i, payload) in payloads.iter().enumerate() { @@ -81,7 +81,7 @@ async fn send_inputs_to_server_manager() { payload: payload.clone().into(), tx_hash: Hash::default(), }); - state.broker.produce_input_event(data).await; + state.broker.add_input_event(data).await; } tracing::info!("waiting until the inputs are processed"); @@ -97,7 +97,7 @@ async fn advance_runner_fails_when_inputs_has_wrong_epoch() { let docker = Cli::default(); let state = TestState::setup(&docker).await; - tracing::info!("producing input with wrong epoch index"); + tracing::info!("adding input with wrong epoch index"); let data = RollupsData::AdvanceStateInput(RollupsAdvanceStateInput { metadata: InputMetadata { input_index: 0, @@ -112,7 +112,7 @@ async fn advance_runner_fails_when_inputs_has_wrong_epoch() { inputs_sent_count: 1, data, }; - state.broker.produce_raw_input_event(input).await; + state.broker.add_raw_input_event(input).await; tracing::info!("waiting for the advance_runner to exit with error"); let err = state.advance_runner.wait_err().await; @@ -124,7 +124,7 @@ async fn advance_runner_fails_when_inputs_has_wrong_parent_id() { let docker = Cli::default(); let state = TestState::setup(&docker).await; - tracing::info!("producing input with wrong parent id"); + tracing::info!("adding input with wrong parent id"); let data = RollupsData::AdvanceStateInput(RollupsAdvanceStateInput { metadata: InputMetadata { input_index: 0, @@ -139,14 +139,14 @@ async fn advance_runner_fails_when_inputs_has_wrong_parent_id() { inputs_sent_count: 1, data, }; - state.broker.produce_raw_input_event(input).await; + state.broker.add_raw_input_event(input).await; tracing::info!("waiting for the advance_runner to exit with error"); let err = state.advance_runner.wait_err().await; assert!(matches!( err, advance_runner::AdvanceRunnerError::RunnerError { - source: advance_runner::RunnerError::ConsumeInputError { + source: advance_runner::RunnerError::ReadInputError { source: advance_runner::BrokerFacadeError::ParentIdMismatchError { expected, got, @@ -162,7 +162,7 @@ async fn advance_runner_generates_claim_after_finishing_epoch() { let state = TestState::setup(&docker).await; const N: usize = 3; - tracing::info!("producing {} finish epoch events", N); + tracing::info!("adding {} finish epoch events", N); for i in 0..N { let advance = RollupsData::AdvanceStateInput(RollupsAdvanceStateInput { @@ -174,13 +174,13 @@ async fn advance_runner_generates_claim_after_finishing_epoch() { tx_hash: Hash::default(), }); let finish = RollupsData::FinishEpoch {}; - state.broker.produce_input_event(advance).await; - state.broker.produce_input_event(finish).await; + state.broker.add_input_event(advance).await; + state.broker.add_input_event(finish).await; } tracing::info!("waiting until the expected claims are generated"); state.server_manager.assert_session_ready().await; - let claims = state.broker.consume_n_claims(N).await; + let claims = state.broker.read_n_claims(N).await; // We don't verify the claim hash because it is not the resposability of the // advance_runner and because it changes every time we update the Cartesi Machine. assert_eq!(claims.len(), N); @@ -194,18 +194,18 @@ async fn advance_runner_finishes_epoch_when_the_previous_epoch_has_inputs() { tracing::info!("finishing epochs with no inputs"); state .broker - .produce_input_event(RollupsData::FinishEpoch {}) + .add_input_event(RollupsData::FinishEpoch {}) .await; state .broker - .produce_input_event(RollupsData::FinishEpoch {}) + .add_input_event(RollupsData::FinishEpoch {}) .await; tracing::info!("waiting until second epoch is finished"); state.server_manager.assert_epoch_finished(1).await; - tracing::info!("checking it didn't produce claims"); - let claims = state.broker.consume_all_claims().await; + tracing::info!("checking it didn't add claims"); + let claims = state.broker.read_all_claims().await; assert_eq!(claims.len(), 0); } @@ -215,7 +215,7 @@ async fn advance_runner_finishes_epoch_when_the_previous_epoch_has_inputs() { /// We can't simply wait for the epoch to be finished because the advance_runner /// still does tasks after that. async fn finish_epoch_and_wait_for_next_input(state: &TestState<'_>) { - tracing::info!("producing input, finish, and another input"); + tracing::info!("adding input, finish, and another input"); let payload = generate_payload(); let inputs = vec![ RollupsData::AdvanceStateInput(RollupsAdvanceStateInput { @@ -237,7 +237,7 @@ async fn finish_epoch_and_wait_for_next_input(state: &TestState<'_>) { }), ]; for input in inputs { - state.broker.produce_input_event(input).await; + state.broker.add_input_event(input).await; } tracing::info!("waiting until second input is processed"); @@ -261,16 +261,16 @@ async fn advance_runner_does_not_generate_duplicate_claim() { let docker = Cli::default(); let state = TestState::setup(&docker).await; - tracing::info!("producing claim"); + tracing::info!("adding claim"); let claim = RollupsClaim::default(); - state.broker.produce_rollups_claim(claim.clone()).await; + state.broker.add_rollups_claim(claim.clone()).await; finish_epoch_and_wait_for_next_input(&state).await; tracing::info!("getting all claims"); - let produced_claims = state.broker.consume_all_claims().await; - assert_eq!(produced_claims.len(), 1); - assert_eq!(produced_claims[0], claim); + let added_claims = state.broker.read_all_claims().await; + assert_eq!(added_claims.len(), 1); + assert_eq!(added_claims[0], claim); } #[test_log::test(tokio::test)] @@ -283,7 +283,7 @@ async fn advance_runner_restore_session_after_restart() { tracing::info!("restarting advance_runner"); state.advance_runner.restart().await; - tracing::info!("producing another input and checking"); + tracing::info!("adding another input and checking"); let input = RollupsData::AdvanceStateInput(RollupsAdvanceStateInput { metadata: InputMetadata { input_index: 2, @@ -292,6 +292,6 @@ async fn advance_runner_restore_session_after_restart() { payload: generate_payload(), tx_hash: Hash::default(), }); - state.broker.produce_input_event(input).await; + state.broker.add_input_event(input).await; state.server_manager.assert_epoch_status(0, 1).await; } diff --git a/offchain/advance-runner/tests/server_integration.rs b/offchain/advance-runner/tests/server_integration.rs index 0ef18d97f..d00532ff7 100644 --- a/offchain/advance-runner/tests/server_integration.rs +++ b/offchain/advance-runner/tests/server_integration.rs @@ -66,7 +66,7 @@ async fn test_advance_runner_sends_inputs_to_server_manager() { let state = TestState::setup(&docker).await; const N: usize = 3; - tracing::info!("producing {} inputs", N); + tracing::info!("adding {} inputs", N); let payloads: Vec<_> = (0..N).map(|_| generate_payload()).collect(); for (i, payload) in payloads.iter().enumerate() { let data = RollupsData::AdvanceStateInput(RollupsAdvanceStateInput { @@ -77,7 +77,7 @@ async fn test_advance_runner_sends_inputs_to_server_manager() { payload: payload.clone().into(), tx_hash: Hash::default(), }); - state.broker.produce_input_event(data).await; + state.broker.add_input_event(data).await; } tracing::info!("waiting until the inputs are processed"); @@ -93,7 +93,7 @@ async fn test_advance_runner_fails_when_inputs_has_wrong_epoch() { let docker = Cli::default(); let state = TestState::setup(&docker).await; - tracing::info!("producing input with wrong epoch index"); + tracing::info!("adding input with wrong epoch index"); let data = RollupsData::AdvanceStateInput(RollupsAdvanceStateInput { metadata: InputMetadata { input_index: 0, @@ -108,7 +108,7 @@ async fn test_advance_runner_fails_when_inputs_has_wrong_epoch() { inputs_sent_count: 1, data, }; - state.broker.produce_raw_input_event(input).await; + state.broker.add_raw_input_event(input).await; tracing::info!("waiting for the advance_runner to exit with error"); let err = state.advance_runner.wait_err().await; @@ -120,7 +120,7 @@ async fn test_advance_runner_fails_when_inputs_has_wrong_parent_id() { let docker = Cli::default(); let state = TestState::setup(&docker).await; - tracing::info!("producing input with wrong parent id"); + tracing::info!("adding input with wrong parent id"); let data = RollupsData::AdvanceStateInput(RollupsAdvanceStateInput { metadata: InputMetadata { input_index: 0, @@ -135,14 +135,14 @@ async fn test_advance_runner_fails_when_inputs_has_wrong_parent_id() { inputs_sent_count: 1, data, }; - state.broker.produce_raw_input_event(input).await; + state.broker.add_raw_input_event(input).await; tracing::info!("waiting for the advance_runner to exit with error"); let err = state.advance_runner.wait_err().await; assert!(matches!( err, advance_runner::AdvanceRunnerError::RunnerError { - source: advance_runner::RunnerError::ConsumeInputError { + source: advance_runner::RunnerError::ReadInputError { source: advance_runner::BrokerFacadeError::ParentIdMismatchError { expected, got, @@ -158,7 +158,7 @@ async fn test_advance_runner_generates_claim_after_finishing_epoch() { let state = TestState::setup(&docker).await; const N: usize = 3; - tracing::info!("producing {} finish epoch events", N); + tracing::info!("adding {} finish epoch events", N); for i in 0..N { let advance = RollupsData::AdvanceStateInput(RollupsAdvanceStateInput { @@ -170,13 +170,13 @@ async fn test_advance_runner_generates_claim_after_finishing_epoch() { tx_hash: Hash::default(), }); let finish = RollupsData::FinishEpoch {}; - state.broker.produce_input_event(advance).await; - state.broker.produce_input_event(finish).await; + state.broker.add_input_event(advance).await; + state.broker.add_input_event(finish).await; } tracing::info!("waiting until the expected claims are generated"); state.server_manager.assert_session_ready().await; - let claims = state.broker.consume_n_claims(N).await; + let claims = state.broker.read_n_claims(N).await; // We don't verify the claim hash because it is not the resposability of the // advance_runner and because it changes every time we update the Cartesi Machine. assert_eq!(claims.len(), N); @@ -190,18 +190,18 @@ async fn test_advance_runner_finishes_epoch_when_it_has_no_inputs() { tracing::info!("finishing epochs with no inputs"); state .broker - .produce_input_event(RollupsData::FinishEpoch {}) + .add_input_event(RollupsData::FinishEpoch {}) .await; state .broker - .produce_input_event(RollupsData::FinishEpoch {}) + .add_input_event(RollupsData::FinishEpoch {}) .await; tracing::info!("waiting until second epoch is finished"); state.server_manager.assert_epoch_finished(1).await; - tracing::info!("checking it didn't produce claims"); - let claims = state.broker.consume_all_claims().await; + tracing::info!("checking it didn't add claims"); + let claims = state.broker.read_all_claims().await; assert_eq!(claims.len(), 0); } @@ -211,7 +211,7 @@ async fn test_advance_runner_finishes_epoch_when_it_has_no_inputs() { /// We can't simply wait for the epoch to be finished because the advance_runner /// still does tasks after that. async fn finish_epoch_and_wait_for_next_input(state: &TestState<'_>) { - tracing::info!("producing input, finish, and another input"); + tracing::info!("adding input, finish, and another input"); let payload = generate_payload(); let inputs = vec![ RollupsData::AdvanceStateInput(RollupsAdvanceStateInput { @@ -233,7 +233,7 @@ async fn finish_epoch_and_wait_for_next_input(state: &TestState<'_>) { }), ]; for input in inputs { - state.broker.produce_input_event(input).await; + state.broker.add_input_event(input).await; } tracing::info!("waiting until second input is processed"); @@ -257,19 +257,16 @@ async fn test_advance_runner_does_not_generate_duplicate_claim() { let docker = Cli::default(); let state = TestState::setup(&docker).await; - tracing::info!("producing claim"); + tracing::info!("adding claim"); let rollups_claim = RollupsClaim::default(); - state - .broker - .produce_rollups_claim(rollups_claim.clone()) - .await; + state.broker.add_rollups_claim(rollups_claim.clone()).await; finish_epoch_and_wait_for_next_input(&state).await; tracing::info!("getting all claims"); - let produced_claims = state.broker.consume_all_claims().await; - assert_eq!(produced_claims.len(), 1); - assert_eq!(produced_claims[0].epoch_hash, rollups_claim.epoch_hash); + let added_claims = state.broker.read_all_claims().await; + assert_eq!(added_claims.len(), 1); + assert_eq!(added_claims[0].epoch_hash, rollups_claim.epoch_hash); } #[test_log::test(tokio::test)] @@ -282,7 +279,7 @@ async fn test_advance_runner_restore_session_after_restart() { tracing::info!("restarting advance_runner"); state.advance_runner.restart().await; - tracing::info!("producing another input and checking"); + tracing::info!("adding another input and checking"); let input = RollupsData::AdvanceStateInput(RollupsAdvanceStateInput { metadata: InputMetadata { input_index: 2, @@ -291,6 +288,6 @@ async fn test_advance_runner_restore_session_after_restart() { payload: generate_payload(), tx_hash: Hash::default(), }); - state.broker.produce_input_event(input).await; + state.broker.add_input_event(input).await; state.server_manager.assert_epoch_status(1, 2).await; } diff --git a/offchain/authority-claimer/src/listener.rs b/offchain/authority-claimer/src/listener.rs index 3f50a1cb1..af4b20937 100644 --- a/offchain/authority-claimer/src/listener.rs +++ b/offchain/authority-claimer/src/listener.rs @@ -60,7 +60,7 @@ impl BrokerListener for DefaultBrokerListener { tracing::trace!("Waiting for claim with id {}", self.last_claim_id); let event = self .broker - .consume_blocking(&self.stream, &self.last_claim_id) + .read_blocking(&self.stream, &self.last_claim_id) .await .context(BrokerSnafu)?; @@ -105,7 +105,7 @@ mod tests { let config = BrokerConfig { redis_endpoint, - consume_timeout: 300000, + read_timeout: 300000, backoff: ExponentialBackoffBuilder::new() .with_initial_interval(Duration::from_millis(1000)) .with_max_elapsed_time(Some(Duration::from_millis(3000))) @@ -116,7 +116,7 @@ mod tests { Ok((fixture, broker)) } - pub async fn produce_rollups_claims( + pub async fn add_rollups_claims( fixture: &BrokerFixture<'_>, n: usize, epoch_index_start: usize, @@ -125,18 +125,18 @@ mod tests { for i in 0..n { let mut rollups_claim = RollupsClaim::default(); rollups_claim.epoch_index = (i + epoch_index_start) as u64; - fixture.produce_rollups_claim(rollups_claim.clone()).await; + fixture.add_rollups_claim(rollups_claim.clone()).await; rollups_claims.push(rollups_claim); } rollups_claims } /// The last claim should trigger an `EndError` error. - pub async fn produce_last_claim( + pub async fn add_last_claim( fixture: &BrokerFixture<'_>, epoch_index: usize, ) -> Vec { - produce_rollups_claims(fixture, 1, epoch_index).await + add_rollups_claims(fixture, 1, epoch_index).await } // ------------------------------------------------------------------------------------------------ @@ -164,8 +164,8 @@ mod tests { let (fixture, mut broker_listener) = setup_broker(&docker, false).await.unwrap(); let n = 5; - produce_rollups_claims(&fixture, n, 0).await; - produce_last_claim(&fixture, n).await; + add_rollups_claims(&fixture, n, 0).await; + add_last_claim(&fixture, n).await; let result = broker_listener.listen().await; assert!(result.is_ok()); } @@ -175,7 +175,7 @@ mod tests { let docker = Cli::default(); let (fixture, mut broker_listener) = setup_broker(&docker, false).await.unwrap(); - produce_last_claim(&fixture, 0).await; + add_last_claim(&fixture, 0).await; let claim = broker_listener.listen().await; assert!(claim.is_ok()); } @@ -198,17 +198,17 @@ mod tests { let x = 2; println!("Creating {} claims.", x); - produce_rollups_claims(&fixture, x, 0).await; + add_rollups_claims(&fixture, x, 0).await; println!("Going to sleep for 2 seconds."); tokio::time::sleep(Duration::from_secs(2)).await; let y = 5; println!("Creating {} claims.", y); - produce_rollups_claims(&fixture, y, x).await; + add_rollups_claims(&fixture, y, x).await; assert_eq!(x + y, n); - produce_last_claim(&fixture, n).await; + add_last_claim(&fixture, n).await; broker_listener_thread.await.unwrap(); } diff --git a/offchain/dispatcher/src/drivers/context.rs b/offchain/dispatcher/src/drivers/context.rs index 04180ea5a..1514e0b9e 100644 --- a/offchain/dispatcher/src/drivers/context.rs +++ b/offchain/dispatcher/src/drivers/context.rs @@ -59,7 +59,7 @@ impl Context { Ok(()) } - pub async fn enqueue_input( + pub async fn add_input( &mut self, input: &Input, broker: &impl BrokerSend, @@ -75,7 +75,7 @@ impl Context { self.finish_epoch_if_needed(input_block_number, broker) .await?; - broker.enqueue_input(self.inputs_sent, input).await?; + broker.add_input(self.inputs_sent, input).await?; self.metrics .advance_inputs_sent @@ -103,7 +103,7 @@ impl Context { // then there are no inputs by definition and the current epoch is empty. // // If last_finished_epoch is Some(x) and last_input_epoch is Some(x), - // then an epoch was finished and the last enqueued input belongs to that epoch; + // then an epoch was finished and the last added input belongs to that epoch; // meaning that any subsequent epochs (which includes the current one) are empty. // // If last_finished_epoch is Some(x) and last_input_epoch is Some(y), or @@ -451,11 +451,11 @@ mod tests { } // -------------------------------------------------------------------------------------------- - // enqueue_input + // add_input // -------------------------------------------------------------------------------------------- #[tokio::test] - async fn enqueue_input_ok() { + async fn add_input_ok() { let number_of_inputs_sent = 42; let last_input_epoch = Some(1); let last_finished_epoch = None; @@ -467,7 +467,7 @@ mod tests { let input = mock::new_input(22); let broker = mock::Broker::new(vec![], vec![]); - let result = context.enqueue_input(&input, &broker).await; + let result = context.add_input(&input, &broker).await; assert!(result.is_ok()); assert_eq!(context.inputs_sent, number_of_inputs_sent + 1); @@ -481,10 +481,10 @@ mod tests { } #[tokio::test] - async fn enqueue_input_broker_error() { + async fn add_input_broker_error() { let mut context = Context::default(); - let broker = mock::Broker::with_enqueue_input_error(); - let result = context.enqueue_input(&mock::new_input(82), &broker).await; + let broker = mock::Broker::with_add_input_error(); + let result = context.add_input(&mock::new_input(82), &broker).await; assert!(result.is_err()); } @@ -641,9 +641,9 @@ mod tests { for block in 0..=last_block { while let Some(input_block) = current_input_block { if block == input_block { - println!("\tenqueue_input(input_block: {})", block); + println!("\tadd_input(input_block: {})", block); let input = mock::new_input(block); - let result = context.enqueue_input(&input, &broker).await; + let result = context.add_input(&input, &broker).await; assert!(result.is_ok()); current_input_block = input_blocks.pop_front(); @@ -672,9 +672,9 @@ mod tests { let broker = mock::Broker::new(vec![], vec![]); for block in input_blocks { - println!("\tenqueue_input(input_block: {})\n", block); + println!("\tadd_input(input_block: {})\n", block); let input = mock::new_input(block); - let result = context.enqueue_input(&input, &broker).await; + let result = context.add_input(&input, &broker).await; assert!(result.is_ok()); } diff --git a/offchain/dispatcher/src/drivers/machine.rs b/offchain/dispatcher/src/drivers/machine.rs index ecab9a3d8..cbb1cb133 100644 --- a/offchain/dispatcher/src/drivers/machine.rs +++ b/offchain/dispatcher/src/drivers/machine.rs @@ -62,7 +62,7 @@ impl MachineDriver { dapp_input_box.inputs.skip(context.inputs_sent() as usize); for input in input_slice { - context.enqueue_input(&input, broker).await?; + context.add_input(&input, broker).await?; } Ok(()) @@ -152,7 +152,7 @@ mod tests { let mut throwaway_broker = new_broker(&context); for i in 0..=1 { assert!(context - .enqueue_input(&mock::new_input(i), &mut throwaway_broker) + .add_input(&mock::new_input(i), &mut throwaway_broker) .await .is_ok()); } @@ -170,7 +170,7 @@ mod tests { let mut throwaway_broker = new_broker(&context); for i in 0..=3 { assert!(context - .enqueue_input(&mock::new_input(i), &mut throwaway_broker) + .add_input(&mock::new_input(i), &mut throwaway_broker) .await .is_ok()); } diff --git a/offchain/dispatcher/src/drivers/mock.rs b/offchain/dispatcher/src/drivers/mock.rs index 951eb743e..9a70904eb 100644 --- a/offchain/dispatcher/src/drivers/mock.rs +++ b/offchain/dispatcher/src/drivers/mock.rs @@ -93,7 +93,7 @@ pub struct Broker { finish_epochs: AtomicU16, status_error: bool, - enqueue_input_error: bool, + add_input_error: bool, finish_epoch_error: bool, } @@ -105,7 +105,7 @@ impl Broker { events: Mutex::new(Vec::new()), finish_epochs: AtomicU16::new(0), status_error: false, - enqueue_input_error: false, + add_input_error: false, finish_epoch_error: false, } } @@ -120,9 +120,9 @@ impl Broker { broker } - pub fn with_enqueue_input_error() -> Self { + pub fn with_add_input_error() -> Self { let mut broker = Self::default(); - broker.enqueue_input_error = true; + broker.add_input_error = true; broker } @@ -173,13 +173,13 @@ impl BrokerStatus for Broker { #[async_trait] impl BrokerSend for Broker { - async fn enqueue_input( + async fn add_input( &self, input_index: u64, _: &Input, ) -> Result<(), BrokerFacadeError> { - if self.enqueue_input_error { - whatever!("enqueue_input error") + if self.add_input_error { + whatever!("add_input error") } else { let mut mutex_guard = self.events.lock().unwrap(); mutex_guard.deref_mut().push(Event::Input(input_index)); diff --git a/offchain/dispatcher/src/machine/mod.rs b/offchain/dispatcher/src/machine/mod.rs index 99a4f2623..52b3b24d8 100644 --- a/offchain/dispatcher/src/machine/mod.rs +++ b/offchain/dispatcher/src/machine/mod.rs @@ -21,7 +21,7 @@ pub trait BrokerStatus: std::fmt::Debug { #[async_trait] pub trait BrokerSend: std::fmt::Debug { - async fn enqueue_input( + async fn add_input( &self, input_index: u64, input: &Input, diff --git a/offchain/dispatcher/src/machine/rollups_broker.rs b/offchain/dispatcher/src/machine/rollups_broker.rs index 99daa425d..a4afcca67 100644 --- a/offchain/dispatcher/src/machine/rollups_broker.rs +++ b/offchain/dispatcher/src/machine/rollups_broker.rs @@ -19,14 +19,14 @@ pub enum BrokerFacadeError { #[snafu(display("error connecting to the broker"))] BrokerConnectionError { source: BrokerError }, - #[snafu(display("error peeking at the end of the stream"))] - PeekInputError { source: BrokerError }, + #[snafu(display("error reading event at the end of the stream"))] + ReadLatestInputError { source: BrokerError }, - #[snafu(display("error producing input event"))] - ProduceInputError { source: BrokerError }, + #[snafu(display("error adding input event"))] + AddInputError { source: BrokerError }, - #[snafu(display("error producing finish-epoch event"))] - ProduceFinishError { source: BrokerError }, + #[snafu(display("error adding finish-epoch event"))] + AddFinishError { source: BrokerError }, #[snafu(whatever, display("{message}"))] Whatever { @@ -68,7 +68,11 @@ impl BrokerFacade { &self, broker: &mut sync::MutexGuard<'_, Broker>, ) -> Result { - let event = self.peek(broker).await?; + tracing::trace!("reading last added event"); + let event = broker + .read_latest(&self.inputs_stream) + .await + .context(ReadLatestInputSnafu)?; let old_epoch_index = event .clone() @@ -88,21 +92,6 @@ impl BrokerFacade { Ok(status) } - - #[tracing::instrument(level = "trace", skip_all)] - async fn peek( - &self, - broker: &mut sync::MutexGuard<'_, Broker>, - ) -> Result>, BrokerFacadeError> { - tracing::trace!("peeking last produced event"); - let response = broker - .peek_latest(&self.inputs_stream) - .await - .context(PeekInputSnafu)?; - tracing::trace!(?response, "got response"); - - Ok(response) - } } #[async_trait] @@ -153,26 +142,26 @@ macro_rules! epoch_sanity_check { #[async_trait] impl BrokerSend for BrokerFacade { #[tracing::instrument(level = "trace", skip_all)] - async fn enqueue_input( + async fn add_input( &self, input_index: u64, input: &Input, ) -> Result<(), BrokerFacadeError> { - tracing::trace!(?input_index, ?input, "enqueueing input"); + tracing::trace!(?input_index, ?input, "adding input"); let mut broker = self.broker.lock().await; let status = self.broker_status(&mut broker).await?; let event = build_next_input(input, &status); - tracing::info!(?event, "producing input event"); + tracing::info!(?event, "adding input event"); input_sanity_check!(event, input_index); let id = broker - .produce(&self.inputs_stream, event) + .add(&self.inputs_stream, event) .await - .context(ProduceInputSnafu)?; - tracing::trace!(id, "produced event with id"); + .context(AddInputSnafu)?; + tracing::trace!(id, "added event with id"); Ok(()) } @@ -188,16 +177,16 @@ impl BrokerSend for BrokerFacade { let status = self.broker_status(&mut broker).await?; // Epoch number gets incremented here! let event = build_next_finish_epoch(&status); - tracing::info!(?event, "producing finish epoch event"); + tracing::info!(?event, "adding finish epoch event"); epoch_sanity_check!(event, inputs_sent_count); let id = broker - .produce(&self.inputs_stream, event) + .add(&self.inputs_stream, event) .await - .context(ProduceFinishSnafu)?; + .context(AddFinishSnafu)?; - tracing::trace!(id, "produce event with id"); + tracing::trace!(id, "add event with id"); Ok(()) } @@ -343,7 +332,7 @@ mod broker_facade_tests { async fn status_inputs_sent_count_equals_1() { let docker = Cli::default(); let (fixture, broker) = setup(&docker).await; - produce_advance_state_inputs(&fixture, 1).await; + add_advance_state_inputs(&fixture, 1).await; let status = broker.status().await.expect("'status' function failed"); assert_eq!(status.inputs_sent_count, 1); } @@ -352,24 +341,21 @@ mod broker_facade_tests { async fn status_inputs_sent_count_equals_10() { let docker = Cli::default(); let (fixture, broker) = setup(&docker).await; - produce_advance_state_inputs(&fixture, 10).await; + add_advance_state_inputs(&fixture, 10).await; let status = broker.status().await.expect("'status' function failed"); assert_eq!(status.inputs_sent_count, 10); } // -------------------------------------------------------------------------------------------- - // enqueue_input + // add_input // -------------------------------------------------------------------------------------------- #[tokio::test] - async fn enqueue_input_ok() { + async fn add_input_ok() { let docker = Cli::default(); let (_fixture, broker) = setup(&docker).await; for i in 0..3 { - assert!(broker - .enqueue_input(i, &new_enqueue_input()) - .await - .is_ok()); + assert!(broker.add_input(i, &new_add_input()).await.is_ok()); } } @@ -377,26 +363,23 @@ mod broker_facade_tests { #[should_panic( expected = "assertion `left == right` failed\n left: 1\n right: 6" )] - async fn enqueue_input_assertion_error_1() { + async fn add_input_assertion_error_1() { let docker = Cli::default(); let (_fixture, broker) = setup(&docker).await; - let _ = broker.enqueue_input(5, &new_enqueue_input()).await; + let _ = broker.add_input(5, &new_add_input()).await; } #[tokio::test] #[should_panic( expected = "assertion `left == right` failed\n left: 5\n right: 6" )] - async fn enqueue_input_assertion_error_2() { + async fn add_input_assertion_error_2() { let docker = Cli::default(); let (_fixture, broker) = setup(&docker).await; for i in 0..4 { - assert!(broker - .enqueue_input(i, &new_enqueue_input()) - .await - .is_ok()); + assert!(broker.add_input(i, &new_add_input()).await.is_ok()); } - let _ = broker.enqueue_input(5, &new_enqueue_input()).await; + let _ = broker.add_input(5, &new_add_input()).await; } // NOTE: cannot test result error because the dependency is not injectable. @@ -419,10 +402,10 @@ mod broker_facade_tests { let docker = Cli::default(); let (fixture, broker) = setup(&docker).await; let first_epoch_inputs = 3; - produce_advance_state_inputs(&fixture, first_epoch_inputs).await; - produce_finish_epoch_input(&fixture).await; + add_advance_state_inputs(&fixture, first_epoch_inputs).await; + add_finish_epoch_input(&fixture).await; let second_epoch_inputs = 7; - produce_advance_state_inputs(&fixture, second_epoch_inputs).await; + add_advance_state_inputs(&fixture, second_epoch_inputs).await; let total_inputs = first_epoch_inputs + second_epoch_inputs; assert!(broker.finish_epoch(total_inputs as u64).await.is_ok()); } @@ -457,7 +440,7 @@ mod broker_facade_tests { }; let config = BrokerConfig { redis_endpoint, - consume_timeout: 300000, + read_timeout: 300000, backoff: ExponentialBackoffBuilder::new() .with_initial_interval(Duration::from_millis(1000)) .with_max_elapsed_time(Some(Duration::from_millis(3000))) @@ -475,7 +458,7 @@ mod broker_facade_tests { failable_setup(docker, false).await.unwrap() } - fn new_enqueue_input() -> Input { + fn new_add_input() -> Input { Input { sender: Arc::new(H160::random()), payload: vec![], @@ -491,10 +474,10 @@ mod broker_facade_tests { } } - async fn produce_advance_state_inputs(fixture: &BrokerFixture<'_>, n: u32) { + async fn add_advance_state_inputs(fixture: &BrokerFixture<'_>, n: u32) { for _ in 0..n { let _ = fixture - .produce_input_event(RollupsData::AdvanceStateInput( + .add_input_event(RollupsData::AdvanceStateInput( RollupsAdvanceStateInput { metadata: InputMetadata::default(), payload: Payload::default(), @@ -505,9 +488,7 @@ mod broker_facade_tests { } } - async fn produce_finish_epoch_input(fixture: &BrokerFixture<'_>) { - let _ = fixture - .produce_input_event(RollupsData::FinishEpoch {}) - .await; + async fn add_finish_epoch_input(fixture: &BrokerFixture<'_>) { + let _ = fixture.add_input_event(RollupsData::FinishEpoch {}).await; } } diff --git a/offchain/indexer/src/indexer.rs b/offchain/indexer/src/indexer.rs index 47dc38ed7..55edb658d 100644 --- a/offchain/indexer/src/indexer.rs +++ b/offchain/indexer/src/indexer.rs @@ -49,7 +49,7 @@ impl Indexer { tracing::info!("connected to broker; starting main loop"); loop { - let event = indexer.consume_event().await?; + let event = indexer.read_event().await?; let repository = indexer.repository.clone(); tokio::task::spawn_blocking(move || match event { IndexerEvent::Input(input) => { @@ -66,16 +66,16 @@ impl Indexer { } #[tracing::instrument(level = "trace", skip_all)] - async fn consume_event(&mut self) -> Result { + async fn read_event(&mut self) -> Result { tracing::info!(?self.state, "waiting for next event"); loop { - match self.broker.indexer_consume(&mut self.state).await { + match self.broker.indexer_read(&mut self.state).await { Ok(event) => { tracing::info!(?event, "received event"); return Ok(event); } Err(source) => match source { - BrokerError::ConsumeTimeout => { + BrokerError::ReadTimeout => { tracing::trace!("broker timed out, trying again"); continue; } diff --git a/offchain/indexer/tests/integration.rs b/offchain/indexer/tests/integration.rs index 7b8a51bd8..7aecc293c 100644 --- a/offchain/indexer/tests/integration.rs +++ b/offchain/indexer/tests/integration.rs @@ -19,7 +19,7 @@ use test_fixtures::{BrokerFixture, RepositoryFixture}; use testcontainers::clients::Cli; use tokio::task::JoinHandle; -const BROKER_CONSUME_TIMEOUT: usize = 100; +const BROKER_READ_TIMEOUT: usize = 100; /// Starts one container with the broker, one container with the database, /// and the indexer in a background thread. @@ -38,7 +38,7 @@ async fn indexer_inserts_inputs() { const N: u64 = 3; let mut inputs = vec![]; for i in 0..N { - let input = state.produce_input_in_broker(i).await; + let input = state.add_input_in_broker(i).await; inputs.push(input); } @@ -57,9 +57,9 @@ async fn indexer_inserts_vouchers() { const N: u64 = 3; let mut vouchers = vec![]; for i in 0..N { - state.produce_input_in_broker(i).await; + state.add_input_in_broker(i).await; for j in 0..N { - let voucher = state.produce_voucher_in_broker(i, j).await; + let voucher = state.add_voucher_in_broker(i, j).await; vouchers.push(voucher) } } @@ -79,9 +79,9 @@ async fn indexer_inserts_notices() { const N: u64 = 3; let mut notices = vec![]; for i in 0..N { - state.produce_input_in_broker(i).await; + state.add_input_in_broker(i).await; for j in 0..N { - let notice = state.produce_notice_in_broker(i, j).await; + let notice = state.add_notice_in_broker(i, j).await; notices.push(notice); } } @@ -101,9 +101,9 @@ async fn indexer_inserts_reports() { const N: u64 = 3; let mut reports = vec![]; for i in 0..N { - state.produce_input_in_broker(i).await; + state.add_input_in_broker(i).await; for j in 0..N { - let report = state.produce_report_in_broker(i, j).await; + let report = state.add_report_in_broker(i, j).await; reports.push(report); } } @@ -123,18 +123,18 @@ async fn indexer_inserts_proofs() { const N: u64 = 3; let mut proofs = vec![]; for i in 0..N { - state.produce_input_in_broker(i).await; + state.add_input_in_broker(i).await; for j in 0..N { - state.produce_voucher_in_broker(i, j).await; - state.produce_notice_in_broker(i, j).await; + state.add_voucher_in_broker(i, j).await; + state.add_notice_in_broker(i, j).await; proofs.push( state - .produce_proof_in_broker(i, j, RollupsOutputEnum::Voucher) + .add_proof_in_broker(i, j, RollupsOutputEnum::Voucher) .await, ); proofs.push( state - .produce_proof_in_broker(i, j, RollupsOutputEnum::Notice) + .add_proof_in_broker(i, j, RollupsOutputEnum::Notice) .await, ); } @@ -152,11 +152,11 @@ async fn indexer_ignores_finish_epoch_and_insert_input_after() { let docker = Cli::default(); let state = TestState::setup(&docker).await; - tracing::info!("producing finish epoch"); + tracing::info!("adding finish epoch"); let data = RollupsData::FinishEpoch {}; - state.broker.produce_input_event(data).await; + state.broker.add_input_event(data).await; - let input_sent = state.produce_input_in_broker(0).await; + let input_sent = state.add_input_in_broker(0).await; let input_read = state.get_input_from_database(&input_sent).await; assert_input_eq(&input_sent, &input_read); } @@ -167,8 +167,8 @@ async fn indexer_does_not_override_existing_input() { let docker = Cli::default(); let state = TestState::setup(&docker).await; - let original_input = state.produce_input_in_broker(0).await; - let _second_input = state.produce_input_in_broker(0).await; + let original_input = state.add_input_in_broker(0).await; + let _second_input = state.add_input_in_broker(0).await; let input_read = state.get_input_from_database(&original_input).await; assert_input_eq(&original_input, &input_read); } @@ -179,13 +179,13 @@ async fn indexer_inserts_input_after_broker_timeout() { let docker = Cli::default(); let state = TestState::setup(&docker).await; - tracing::info!("sleeping so the broker consume times out in indexer"); + tracing::info!("sleeping so the broker read times out in indexer"); tokio::time::sleep(std::time::Duration::from_millis( - 2 * BROKER_CONSUME_TIMEOUT as u64, + 2 * BROKER_READ_TIMEOUT as u64, )) .await; - let input_sent = state.produce_input_in_broker(0).await; + let input_sent = state.add_input_in_broker(0).await; let input_read = state.get_input_from_database(&input_sent).await; assert_input_eq(&input_sent, &input_read); } @@ -196,7 +196,7 @@ async fn indexer_fails_to_insert_output_when_input_does_not_exist() { let docker = Cli::default(); let state = TestState::setup(&docker).await; - state.produce_voucher_in_broker(0, 0).await; + state.add_voucher_in_broker(0, 0).await; let error = state.get_indexer_error().await; assert!(matches!(error, IndexerError::RepositoryError { .. })); } @@ -227,7 +227,7 @@ impl TestState<'_> { .expect_err("indexer should exit with error") } - async fn produce_input_in_broker( + async fn add_input_in_broker( &self, input_index: u64, ) -> RollupsAdvanceStateInput { @@ -235,11 +235,11 @@ impl TestState<'_> { .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_millis() as u64; - self.produce_input_in_broker_with_timestamp(input_index, timestamp) + self.add_input_in_broker_with_timestamp(input_index, timestamp) .await } - async fn produce_input_in_broker_with_timestamp( + async fn add_input_in_broker_with_timestamp( &self, input_index: u64, timestamp: u64, @@ -258,8 +258,8 @@ impl TestState<'_> { }; let data = RollupsData::AdvanceStateInput(input.clone()); - tracing::info!(?input, "producing input"); - self.broker.produce_input_event(data).await; + tracing::info!(?input, "adding input"); + self.broker.add_input_event(data).await; input } @@ -272,7 +272,7 @@ impl TestState<'_> { self.repository.retry(move |r| r.get_input(index)).await } - async fn produce_voucher_in_broker( + async fn add_voucher_in_broker( &self, input_index: u64, index: u64, @@ -285,8 +285,8 @@ impl TestState<'_> { }; let output = RollupsOutput::Voucher(voucher.clone()); - tracing::info!(?voucher, "producing voucher"); - self.broker.produce_output(output).await; + tracing::info!(?voucher, "adding voucher"); + self.broker.add_output(output).await; voucher } @@ -302,7 +302,7 @@ impl TestState<'_> { .await } - async fn produce_notice_in_broker( + async fn add_notice_in_broker( &self, input_index: u64, index: u64, @@ -314,8 +314,8 @@ impl TestState<'_> { }; let output = RollupsOutput::Notice(notice.clone()); - tracing::info!(?notice, "producing notice"); - self.broker.produce_output(output).await; + tracing::info!(?notice, "adding notice"); + self.broker.add_output(output).await; notice } @@ -331,7 +331,7 @@ impl TestState<'_> { .await } - async fn produce_report_in_broker( + async fn add_report_in_broker( &self, input_index: u64, index: u64, @@ -343,8 +343,8 @@ impl TestState<'_> { }; let output = RollupsOutput::Report(report.clone()); - tracing::info!(?report, "producing report"); - self.broker.produce_output(output).await; + tracing::info!(?report, "adding report"); + self.broker.add_output(output).await; report } @@ -360,7 +360,7 @@ impl TestState<'_> { .await } - async fn produce_proof_in_broker( + async fn add_proof_in_broker( &self, input_index: u64, output_index: u64, @@ -385,8 +385,8 @@ impl TestState<'_> { }; let output = RollupsOutput::Proof(proof.clone()); - tracing::info!(?proof, "producing proof"); - self.broker.produce_output(output).await; + tracing::info!(?proof, "adding proof"); + self.broker.add_output(output).await; proof } @@ -425,7 +425,7 @@ async fn spawn_indexer( ) -> JoinHandle> { let broker_config = BrokerConfig { redis_endpoint, - consume_timeout: BROKER_CONSUME_TIMEOUT, + read_timeout: BROKER_READ_TIMEOUT, backoff: Default::default(), }; diff --git a/offchain/rollups-events/src/broker/indexer.rs b/offchain/rollups-events/src/broker/indexer.rs index 01422167d..8abdade5b 100644 --- a/offchain/rollups-events/src/broker/indexer.rs +++ b/offchain/rollups-events/src/broker/indexer.rs @@ -44,11 +44,11 @@ impl IndexerState { } impl Broker { - /// Consume an event from the Input stream and if there is none, - /// consume from the Output stream. This is a blocking operation. + /// Read an event from the Input stream and if there is none, + /// read from the Output stream. This is a blocking operation. /// Return IndexerEvent::Input if present or IndexerEvent::Output otherwise #[tracing::instrument(level = "trace", skip_all)] - pub async fn indexer_consume( + pub async fn indexer_read( &self, state: &mut IndexerState, ) -> Result { @@ -56,20 +56,15 @@ impl Broker { let output_stream_key = state.outputs_stream.key(); let mut reply = retry(self.backoff.clone(), || async { let stream_keys = [&input_stream_key, &output_stream_key]; - let last_consumed_ids = - [&state.inputs_last_id, &state.outputs_last_id]; - tracing::trace!( - ?stream_keys, - ?last_consumed_ids, - "consuming event" - ); + let last_read_ids = [&state.inputs_last_id, &state.outputs_last_id]; + tracing::trace!(?stream_keys, ?last_read_ids, "reading event"); let opts = StreamReadOptions::default() .count(1) - .block(self.consume_timeout); + .block(self.read_timeout); let reply: StreamReadReply = self .connection .clone() - .xread_options(&stream_keys, &last_consumed_ids, &opts) + .xread_options(&stream_keys, &last_read_ids, &opts) .await?; Ok(reply) }) @@ -100,7 +95,7 @@ impl Broker { return Ok(IndexerEvent::Output(event)); } - tracing::trace!("indexer consume timed out"); - Err(BrokerError::ConsumeTimeout) + tracing::trace!("indexer read timed out"); + Err(BrokerError::ReadTimeout) } } diff --git a/offchain/rollups-events/src/broker/mod.rs b/offchain/rollups-events/src/broker/mod.rs index fa622a3d1..59ef0658c 100644 --- a/offchain/rollups-events/src/broker/mod.rs +++ b/offchain/rollups-events/src/broker/mod.rs @@ -77,7 +77,7 @@ impl ConnectionLike for BrokerConnection { pub struct Broker { connection: BrokerConnection, backoff: ExponentialBackoff, - consume_timeout: usize, + read_timeout: usize, } impl Broker { @@ -119,13 +119,13 @@ impl Broker { Ok(Self { connection, backoff: config.backoff, - consume_timeout: config.consume_timeout, + read_timeout: config.read_timeout, }) } - /// Produce an event and return its id + /// Add an event to stream and return its id #[tracing::instrument(level = "trace", skip_all)] - pub async fn produce( + pub async fn add( &mut self, stream: &S, payload: S::Payload, @@ -135,11 +135,7 @@ impl Broker { serde_json::to_string(&payload).context(InvalidPayloadSnafu)?; let event_id = retry(self.backoff.clone(), || async { - tracing::trace!( - stream_key = stream.key(), - payload, - "producing event" - ); + tracing::trace!(stream_key = stream.key(), payload, "adding event"); let event_id = self .connection .clone() @@ -155,15 +151,15 @@ impl Broker { Ok(event_id) } - /// Peek at the end of the stream + /// Read event at the end of the stream /// This function doesn't block; if there is no event in the stream it returns None. #[tracing::instrument(level = "trace", skip_all)] - pub async fn peek_latest( + pub async fn read_latest( &mut self, stream: &S, ) -> Result>, BrokerError> { let mut reply = retry(self.backoff.clone(), || async { - tracing::trace!(stream_key = stream.key(), "peeking at the stream"); + tracing::trace!(stream_key = stream.key(), "reading at the stream"); let reply: StreamRangeReply = self .connection .clone() @@ -185,24 +181,24 @@ impl Broker { } #[tracing::instrument(level = "trace", skip_all)] - async fn _consume_blocking( + async fn _read_blocking( &mut self, stream: &S, - last_consumed_id: &str, + last_read_id: &str, ) -> Result, BrokerError> { let mut reply = retry(self.backoff.clone(), || async { tracing::trace!( stream_key = stream.key(), - last_consumed_id, - "consuming event" + last_read_id, + "reading event" ); let opts = StreamReadOptions::default() .count(1) - .block(self.consume_timeout); + .block(self.read_timeout); let reply: StreamReadReply = self .connection .clone() - .xread_options(&[stream.key()], &[last_consumed_id], &opts) + .xread_options(&[stream.key()], &[last_read_id], &opts) .await?; Ok(reply) @@ -211,58 +207,58 @@ impl Broker { .context(ConnectionSnafu)?; tracing::trace!("checking for timeout"); - let mut events = reply.keys.pop().ok_or(BrokerError::ConsumeTimeout)?; + let mut events = reply.keys.pop().ok_or(BrokerError::ReadTimeout)?; tracing::trace!("checking if event was received"); - let event = events.ids.pop().ok_or(BrokerError::FailedToConsume)?; + let event = events.ids.pop().ok_or(BrokerError::FailedToRead)?; tracing::trace!("parsing received event"); event.try_into() } - /// Consume the next event in stream + /// Read the next event in stream /// /// This function blocks until a new event is available /// and retries whenever a timeout happens instead of returning an error. /// - /// To consume the first event in the stream, `last_consumed_id` should be `INITIAL_ID`. + /// To read the first event in the stream, `last_read_id` should be `INITIAL_ID`. #[tracing::instrument(level = "trace", skip_all)] - pub async fn consume_blocking( + pub async fn read_blocking( &mut self, stream: &S, - last_consumed_id: &str, + last_read_id: &str, ) -> Result, BrokerError> { loop { - let result = self._consume_blocking(stream, last_consumed_id).await; + let result = self._read_blocking(stream, last_read_id).await; - if let Err(BrokerError::ConsumeTimeout) = result { - tracing::trace!("consume timed out, retrying"); + if let Err(BrokerError::ReadTimeout) = result { + tracing::trace!("read event (blocking) timed out, retrying"); } else { return result; } } } - /// Consume the next event in stream without blocking + /// Read the next event in stream without blocking /// This function returns None if there are no more remaining events. - /// To consume the first event in the stream, `last_consumed_id` should be `INITIAL_ID`. + /// To read the first event in the stream, `last_read_id` should be `INITIAL_ID`. #[tracing::instrument(level = "trace", skip_all)] - pub async fn consume_nonblocking( + pub async fn read_nonblocking( &mut self, stream: &S, - last_consumed_id: &str, + last_read_id: &str, ) -> Result>, BrokerError> { let mut reply = retry(self.backoff.clone(), || async { tracing::trace!( stream_key = stream.key(), - last_consumed_id, - "consuming event (non-blocking)" + last_read_id, + "reading event (non-blocking)" ); let opts = StreamReadOptions::default().count(1); let reply: StreamReadReply = self .connection .clone() - .xread_options(&[stream.key()], &[last_consumed_id], &opts) + .xread_options(&[stream.key()], &[last_read_id], &opts) .await?; Ok(reply) @@ -272,7 +268,7 @@ impl Broker { tracing::trace!("checking if event was received"); if let Some(mut events) = reply.keys.pop() { - let event = events.ids.pop().ok_or(BrokerError::FailedToConsume)?; + let event = events.ids.pop().ok_or(BrokerError::FailedToRead)?; tracing::trace!("parsing received event"); Some(event.try_into()).transpose() } else { @@ -286,7 +282,7 @@ impl Broker { impl fmt::Debug for Broker { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Broker") - .field("consume_timeout", &self.consume_timeout) + .field("read_timeout", &self.read_timeout) .finish() } } @@ -333,11 +329,11 @@ pub enum BrokerError { #[snafu(display("error connecting to Redis"))] ConnectionError { source: RedisError }, - #[snafu(display("failed to consume event"))] - FailedToConsume, + #[snafu(display("failed to read event"))] + FailedToRead, - #[snafu(display("timed out when consuming event"))] - ConsumeTimeout, + #[snafu(display("timed out when reading event"))] + ReadTimeout, #[snafu(display("event in invalid format"))] InvalidEvent, @@ -360,9 +356,9 @@ pub struct BrokerCLIConfig { #[arg(long, env, num_args = 1.., value_delimiter = ',')] redis_cluster_endpoints: Option>, - /// Timeout when consuming input events (in millis) + /// Timeout when reading input events (in millis) #[arg(long, env, default_value = "5000")] - broker_consume_timeout: usize, + broker_read_timeout: usize, /// The max elapsed time for backoff in ms #[arg(long, env, default_value = "120000")] @@ -378,7 +374,7 @@ pub enum BrokerEndpoint { #[derive(Debug, Clone)] pub struct BrokerConfig { pub redis_endpoint: BrokerEndpoint, - pub consume_timeout: usize, + pub read_timeout: usize, pub backoff: ExponentialBackoff, } @@ -410,7 +406,7 @@ impl From for BrokerConfig { }; BrokerConfig { redis_endpoint, - consume_timeout: cli_config.broker_consume_timeout, + read_timeout: cli_config.broker_read_timeout, backoff, } } diff --git a/offchain/rollups-events/tests/indexer.rs b/offchain/rollups-events/tests/indexer.rs index 1d5f84171..1dfd719ae 100644 --- a/offchain/rollups-events/tests/indexer.rs +++ b/offchain/rollups-events/tests/indexer.rs @@ -13,7 +13,7 @@ use testcontainers::{ clients::Cli, core::WaitFor, images::generic::GenericImage, Container, }; -pub const CONSUME_TIMEOUT: usize = 10; +pub const READ_TIMEOUT: usize = 10; pub const CHAIN_ID: u64 = 99; pub const DAPP_ADDRESS: Address = Address::new([0xfa; 20]); @@ -42,7 +42,7 @@ impl TestState<'_> { let backoff = ExponentialBackoff::default(); let config = BrokerConfig { redis_endpoint: BrokerEndpoint::Single(self.redis_endpoint.clone()), - consume_timeout: CONSUME_TIMEOUT, + read_timeout: READ_TIMEOUT, backoff, }; Broker::new(config) @@ -52,32 +52,31 @@ impl TestState<'_> { } #[test_log::test(tokio::test)] -async fn it_times_out_when_no_indexer_event_is_produced() { +async fn it_times_out_when_no_indexer_event_is_added() { let docker = Cli::default(); let state = TestState::setup(&docker).await; let broker = state.create_broker().await; let mut indexer_state = IndexerState::new(&dapp_metadata()); let err = broker - .indexer_consume(&mut indexer_state) + .indexer_read(&mut indexer_state) .await - .expect_err("consume event worked but it should have failed"); - assert!(matches!(err, BrokerError::ConsumeTimeout)); + .expect_err("read event worked but it should have failed"); + assert!(matches!(err, BrokerError::ReadTimeout)); } #[test_log::test(tokio::test)] -async fn it_consumes_input_events() { +async fn it_reads_input_events() { let docker = Cli::default(); let state = TestState::setup(&docker).await; let mut broker = state.create_broker().await; - // Produce input events + // Add input events let inputs = generate_inputs(); let metadata = dapp_metadata(); let stream = RollupsInputsStream::new(&metadata); - produce_all(&mut broker, &stream, &inputs).await; - // Consume indexer events - let consumed_events = - consume_all(&mut broker, &metadata, inputs.len()).await; - for (event, input) in consumed_events.iter().zip(&inputs) { + add_all(&mut broker, &stream, &inputs).await; + // Read indexer events + let read_events = read_all(&mut broker, &metadata, inputs.len()).await; + for (event, input) in read_events.iter().zip(&inputs) { assert!(matches!(event, IndexerEvent::Input( Event { @@ -91,19 +90,18 @@ async fn it_consumes_input_events() { } #[test_log::test(tokio::test)] -async fn it_consumes_output_events() { +async fn it_reads_output_events() { let docker = Cli::default(); let state = TestState::setup(&docker).await; let mut broker = state.create_broker().await; - // Produce output events + // Add output events let outputs = generate_outputs(); let metadata = dapp_metadata(); let stream = RollupsOutputsStream::new(&metadata); - produce_all(&mut broker, &stream, &outputs).await; - // Consume indexer events - let consumed_events = - consume_all(&mut broker, &metadata, outputs.len()).await; - for (event, output) in consumed_events.iter().zip(&outputs) { + add_all(&mut broker, &stream, &outputs).await; + // Read indexer events + let read_events = read_all(&mut broker, &metadata, outputs.len()).await; + for (event, output) in read_events.iter().zip(&outputs) { assert!(matches!(event, IndexerEvent::Output( Event { @@ -117,24 +115,24 @@ async fn it_consumes_output_events() { } #[test_log::test(tokio::test)] -async fn it_consumes_inputs_before_outputs() { +async fn it_reads_inputs_before_outputs() { let docker = Cli::default(); let state = TestState::setup(&docker).await; let mut broker = state.create_broker().await; - // First, produce output events + // First, add output events let outputs = generate_outputs(); let metadata = dapp_metadata(); let outputs_stream = RollupsOutputsStream::new(&metadata); - produce_all(&mut broker, &outputs_stream, &outputs).await; - // Then, produce input events + add_all(&mut broker, &outputs_stream, &outputs).await; + // Then, add input events let inputs = generate_inputs(); let inputs_stream = RollupsInputsStream::new(&metadata); - produce_all(&mut broker, &inputs_stream, &inputs).await; - // Finally, consume indexer events - let consumed_events = - consume_all(&mut broker, &metadata, outputs.len() + inputs.len()).await; + add_all(&mut broker, &inputs_stream, &inputs).await; + // Finally, read indexer events + let read_events = + read_all(&mut broker, &metadata, outputs.len() + inputs.len()).await; for (i, input) in inputs.iter().enumerate() { - assert!(matches!(&consumed_events[i], + assert!(matches!(&read_events[i], IndexerEvent::Input( Event { payload, @@ -145,7 +143,7 @@ async fn it_consumes_inputs_before_outputs() { )); } for (i, output) in outputs.iter().enumerate() { - assert!(matches!(&consumed_events[inputs.len() + i], + assert!(matches!(&read_events[inputs.len() + i], IndexerEvent::Output( Event { payload, @@ -193,20 +191,20 @@ fn generate_inputs() -> Vec { ] } -async fn produce_all( +async fn add_all( broker: &mut Broker, stream: &S, payloads: &[S::Payload], ) { for payload in payloads { broker - .produce(stream, payload.clone()) + .add(stream, payload.clone()) .await - .expect("failed to produce"); + .expect("failed to add"); } } -async fn consume_all( +async fn read_all( broker: &mut Broker, dapp_metadata: &DAppMetadata, n: usize, @@ -215,9 +213,9 @@ async fn consume_all( let mut payloads = vec![]; for _ in 0..n { let payload = broker - .indexer_consume(&mut state) + .indexer_read(&mut state) .await - .expect("failed to consume indexer payload"); + .expect("failed to read indexer payload"); payloads.push(payload); } payloads diff --git a/offchain/rollups-events/tests/integration.rs b/offchain/rollups-events/tests/integration.rs index 6bfaa9cc8..15a172204 100644 --- a/offchain/rollups-events/tests/integration.rs +++ b/offchain/rollups-events/tests/integration.rs @@ -16,7 +16,7 @@ use rollups_events::{ }; const STREAM_KEY: &'static str = "test-stream"; -const CONSUME_TIMEOUT: usize = 10; +const READ_TIMEOUT: usize = 10; struct TestState<'d> { _node: Container<'d, GenericImage>, @@ -55,7 +55,7 @@ impl TestState<'_> { let config = BrokerConfig { redis_endpoint: BrokerEndpoint::Single(self.redis_endpoint.clone()), backoff: self.backoff.clone(), - consume_timeout: CONSUME_TIMEOUT, + read_timeout: READ_TIMEOUT, }; Broker::new(config) .await @@ -79,11 +79,11 @@ impl BrokerStream for MockStream { } #[test_log::test(tokio::test)] -async fn test_it_produces_events() { +async fn test_it_adds_events() { let docker = Cli::default(); let mut state = TestState::setup(&docker).await; let mut broker = state.create_broker().await; - // Produce events using the Broker struct + // Add events using the Broker struct const N: usize = 3; let mut ids = vec![]; for i in 0..N { @@ -91,9 +91,9 @@ async fn test_it_produces_events() { data: i.to_string(), }; let id = broker - .produce(&MockStream {}, data) + .add(&MockStream {}, data) .await - .expect("failed to produce"); + .expect("failed to add"); ids.push(id); } // Check the events directly in Redis @@ -111,22 +111,22 @@ async fn test_it_produces_events() { } #[test_log::test(tokio::test)] -async fn test_it_peeks_in_stream_with_no_events() { +async fn test_it_reads_latest_in_stream_with_no_events() { let docker = Cli::default(); let state = TestState::setup(&docker).await; let mut broker = state.create_broker().await; let event = broker - .peek_latest(&MockStream {}) + .read_latest(&MockStream {}) .await - .expect("failed to peek"); + .expect("failed to read latest"); assert!(matches!(event, None)); } #[test_log::test(tokio::test)] -async fn test_it_peeks_in_stream_with_multiple_events() { +async fn test_it_reads_latest_in_stream_with_multiple_events() { let docker = Cli::default(); let mut state = TestState::setup(&docker).await; - // Produce multiple events directly in Redis + // Add multiple events directly in Redis const N: usize = 3; for i in 0..N { let id = format!("1-{}", i); @@ -137,12 +137,12 @@ async fn test_it_peeks_in_stream_with_multiple_events() { .await .expect("failed to add events"); } - // Peek the event using the Broker struct + // Read latest event using the Broker struct let mut broker = state.create_broker().await; let event = broker - .peek_latest(&MockStream {}) + .read_latest(&MockStream {}) .await - .expect("failed to peek"); + .expect("failed to read latest"); if let Some(event) = event { assert_eq!(&event.id, "1-2"); assert_eq!(&event.payload.data, "2"); @@ -152,48 +152,48 @@ async fn test_it_peeks_in_stream_with_multiple_events() { } #[test_log::test(tokio::test)] -async fn test_it_fails_to_peek_event_in_invalid_format() { +async fn test_it_fails_to_read_latest_event_in_invalid_format() { let docker = Cli::default(); let mut state = TestState::setup(&docker).await; - // Produce event directly in Redis + // Add event directly in Redis let _: String = state .conn .xadd(STREAM_KEY, "1-0", &[("wrong_field", "0")]) .await .expect("failed to add events"); - // Peek the event using the Broker struct + // Read latest event using the Broker struct let mut broker = state.create_broker().await; let err = broker - .peek_latest(&MockStream {}) + .read_latest(&MockStream {}) .await .expect_err("failed to get error"); assert!(matches!(err, BrokerError::InvalidEvent)); } #[test_log::test(tokio::test)] -async fn test_it_fails_to_peek_event_with_invalid_data_encoding() { +async fn test_it_fails_to_read_latest_event_with_invalid_data_encoding() { let docker = Cli::default(); let mut state = TestState::setup(&docker).await; - // Produce event directly in Redis + // Add event directly in Redis let _: String = state .conn .xadd(STREAM_KEY, "1-0", &[("payload", "not a json")]) .await .expect("failed to add events"); - // Peek the event using the Broker struct + // Read latest event using the Broker struct let mut broker = state.create_broker().await; let err = broker - .peek_latest(&MockStream {}) + .read_latest(&MockStream {}) .await .expect_err("failed to get error"); assert!(matches!(err, BrokerError::InvalidPayload { .. })); } #[test_log::test(tokio::test)] -async fn test_it_consumes_events() { +async fn test_it_reads_events() { let docker = Cli::default(); let mut state = TestState::setup(&docker).await; - // Produce multiple events directly in Redis + // Queue multiple events directly in Redis const N: usize = 3; for i in 0..N { let id = format!("1-{}", i); @@ -204,14 +204,14 @@ async fn test_it_consumes_events() { .await .expect("failed to add events"); } - // Consume events using the Broker struct + // Read events using the Broker struct let mut broker = state.create_broker().await; let mut last_id = INITIAL_ID.to_owned(); for i in 0..N { let event = broker - .consume_blocking(&MockStream {}, &last_id) + .read_blocking(&MockStream {}, &last_id) .await - .expect("failed to consume"); + .expect("failed to read"); assert_eq!(event.id, format!("1-{}", i)); assert_eq!(event.payload.data, i.to_string()); last_id = event.id; @@ -219,7 +219,7 @@ async fn test_it_consumes_events() { } #[test_log::test(tokio::test)] -async fn test_it_blocks_until_event_is_produced() { +async fn test_it_blocks_until_event_is_added() { let docker = Cli::default(); let state = TestState::setup(&docker).await; // Spawn another thread that sends the event after a few ms @@ -237,19 +237,19 @@ async fn test_it_blocks_until_event_is_produced() { // In the main thread, wait for the expected event let mut broker = state.create_broker().await; let event = broker - .consume_blocking(&MockStream {}, "0") + .read_blocking(&MockStream {}, "0") .await - .expect("failed to consume event"); + .expect("failed to read event"); assert_eq!(event.id, "1-0"); assert_eq!(event.payload.data, "0"); handler.await.expect("failed to wait handler"); } #[test_log::test(tokio::test)] -async fn test_it_consumes_events_without_blocking() { +async fn test_it_reads_events_without_blocking() { let docker = Cli::default(); let mut state = TestState::setup(&docker).await; - // Produce multiple events directly in Redis + // Queue multiple events directly in Redis const N: usize = 3; for i in 0..N { let id = format!("1-{}", i); @@ -260,14 +260,14 @@ async fn test_it_consumes_events_without_blocking() { .await .expect("failed to add events"); } - // Consume events using the Broker struct + // Read events using the Broker struct let mut broker = state.create_broker().await; let mut last_id = INITIAL_ID.to_owned(); for i in 0..N { let event = broker - .consume_nonblocking(&MockStream {}, &last_id) + .read_nonblocking(&MockStream {}, &last_id) .await - .expect("failed to consume") + .expect("failed to read") .expect("expected event, got None"); assert_eq!(event.id, format!("1-{}", i)); assert_eq!(event.payload.data, i.to_string()); @@ -276,13 +276,13 @@ async fn test_it_consumes_events_without_blocking() { } #[test_log::test(tokio::test)] -async fn test_it_does_not_block_when_consuming_empty_stream() { +async fn test_it_does_not_block_when_reading_empty_stream() { let docker = Cli::default(); let state = TestState::setup(&docker).await; let mut broker = state.create_broker().await; let event = broker - .consume_nonblocking(&MockStream {}, INITIAL_ID) + .read_nonblocking(&MockStream {}, INITIAL_ID) .await - .expect("failed to peek"); + .expect("failed to read"); assert!(matches!(event, None)); } diff --git a/offchain/test-fixtures/src/broker.rs b/offchain/test-fixtures/src/broker.rs index 95b9ce19b..826c1acd7 100644 --- a/offchain/test-fixtures/src/broker.rs +++ b/offchain/test-fixtures/src/broker.rs @@ -15,7 +15,7 @@ use tokio::sync::Mutex; const CHAIN_ID: u64 = 0; const DAPP_ADDRESS: Address = Address::new([0xfa; ADDRESS_SIZE]); -const CONSUME_TIMEOUT: usize = 10_000; // ms +const READ_TIMEOUT: usize = 10_000; // ms pub struct BrokerFixture<'d> { _node: Container<'d, GenericImage>, @@ -56,7 +56,7 @@ impl BrokerFixture<'_> { let outputs_stream = RollupsOutputsStream::new(&metadata); let config = BrokerConfig { redis_endpoint: redis_endpoint.clone(), - consume_timeout: CONSUME_TIMEOUT, + read_timeout: READ_TIMEOUT, backoff, }; @@ -102,22 +102,22 @@ impl BrokerFixture<'_> { /// Obtain the latest event from the rollups inputs stream #[tracing::instrument(level = "trace", skip_all)] - pub async fn get_latest_input_event(&self) -> Option> { - tracing::trace!("getting latest input event"); + pub async fn read_latest_input_event(&self) -> Option> { + tracing::trace!("reading latest input event"); self.client .lock() .await - .peek_latest(&self.inputs_stream) + .read_latest(&self.inputs_stream) .await - .expect("failed to get latest input event") + .expect("failed to read latest input event") } - /// Produce the input event given the data - /// Return the produced event id + /// Add the input event given the data + /// Return the added event id #[tracing::instrument(level = "trace", skip_all)] - pub async fn produce_input_event(&self, data: RollupsData) -> String { - tracing::trace!(?data, "producing rollups-input event"); - let last_event = self.get_latest_input_event().await; + pub async fn add_input_event(&self, data: RollupsData) -> String { + tracing::trace!(?data, "adding rollups-input event"); + let last_event = self.read_latest_input_event().await; let epoch_index = match last_event.as_ref() { Some(event) => match event.payload.data { RollupsData::AdvanceStateInput { .. } => { @@ -147,35 +147,35 @@ impl BrokerFixture<'_> { inputs_sent_count, data, }; - self.produce_raw_input_event(input).await + self.add_raw_input_event(input).await } - /// Produce the input event given the input - /// This may produce inconsistent inputs - /// Return the produced event id + /// Add the input event given the input + /// This may add inconsistent inputs + /// Return the add event id #[tracing::instrument(level = "trace", skip_all)] - pub async fn produce_raw_input_event(&self, input: RollupsInput) -> String { - tracing::trace!(?input, "producing rollups-input raw event"); + pub async fn add_raw_input_event(&self, input: RollupsInput) -> String { + tracing::trace!(?input, "adding rollups-input raw event"); self.client .lock() .await - .produce(&self.inputs_stream, input) + .add(&self.inputs_stream, input) .await - .expect("failed to produce event") + .expect("failed to add event") } - /// Produce the claim given the hash + /// Add the claim given the hash #[tracing::instrument(level = "trace", skip_all)] - pub async fn produce_rollups_claim(&self, rollups_claim: RollupsClaim) { - tracing::trace!(?rollups_claim.epoch_hash, "producing rollups-claim event"); + pub async fn add_rollups_claim(&self, rollups_claim: RollupsClaim) { + tracing::trace!(?rollups_claim.epoch_hash, "adding rollups-claim event"); { let last_claim = self .client .lock() .await - .peek_latest(&self.claims_stream) + .read_latest(&self.claims_stream) .await - .expect("failed to get latest claim"); + .expect("failed to read latest claim"); let epoch_index = match last_claim { Some(event) => event.payload.epoch_index + 1, None => 0, @@ -188,24 +188,24 @@ impl BrokerFixture<'_> { self.client .lock() .await - .produce(&self.claims_stream, rollups_claim) + .add(&self.claims_stream, rollups_claim) .await - .expect("failed to produce claim"); + .expect("failed to add claim"); } - /// Obtain all produced claims + /// Obtain all added claims #[tracing::instrument(level = "trace", skip_all)] - pub async fn consume_all_claims(&self) -> Vec { - tracing::trace!("consuming all rollups-claims events"); + pub async fn read_all_claims(&self) -> Vec { + tracing::trace!("reading all rollups-claims events"); let mut claims = vec![]; let mut last_id = INITIAL_ID.to_owned(); while let Some(event) = self .client .lock() .await - .consume_nonblocking(&self.claims_stream, &last_id) + .read_nonblocking(&self.claims_stream, &last_id) .await - .expect("failed to consume claim") + .expect("failed to read claim") { claims.push(event.payload); last_id = event.id; @@ -213,11 +213,11 @@ impl BrokerFixture<'_> { claims } - /// Obtain the first n produced claims + /// Obtain the first n added claims /// Panic in case of timeout #[tracing::instrument(level = "trace", skip_all)] - pub async fn consume_n_claims(&self, n: usize) -> Vec { - tracing::trace!(n, "consuming n rollups-claims events"); + pub async fn read_n_claims(&self, n: usize) -> Vec { + tracing::trace!(n, "reading n rollups-claims events"); let mut claims = vec![]; let mut last_id = INITIAL_ID.to_owned(); for _ in 0..n { @@ -225,24 +225,24 @@ impl BrokerFixture<'_> { .client .lock() .await - .consume_blocking(&self.claims_stream, &last_id) + .read_blocking(&self.claims_stream, &last_id) .await - .expect("failed to consume claim"); + .expect("failed to read claim"); claims.push(event.payload); last_id = event.id } claims } - /// Produce an output event + /// Add an output event #[tracing::instrument(level = "trace", skip_all)] - pub async fn produce_output(&self, output: RollupsOutput) { - tracing::trace!(?output, "producing rollups-outputs event"); + pub async fn add_output(&self, output: RollupsOutput) { + tracing::trace!(?output, "adding rollups-outputs event"); self.client .lock() .await - .produce(&self.outputs_stream, output) + .add(&self.outputs_stream, output) .await - .expect("failed to produce output"); + .expect("failed to add output"); } }