From 58084825a630383286753710a9ea11b2dea5dd91 Mon Sep 17 00:00:00 2001 From: Renan Date: Mon, 14 Aug 2023 17:50:46 -0300 Subject: [PATCH] feat(offchain): implement the broker-listener for the authority-claimer --- offchain/Cargo.lock | 4 +- offchain/advance-runner/src/broker.rs | 2 +- offchain/authority-claimer/Cargo.toml | 7 +- offchain/authority-claimer/src/checker.rs | 13 +- offchain/authority-claimer/src/claimer.rs | 128 ++++++------ offchain/authority-claimer/src/lib.rs | 71 +++---- offchain/authority-claimer/src/listener.rs | 196 +++++++++++++++---- offchain/authority-claimer/src/mock.rs | 139 +++++++++++++ offchain/authority-claimer/src/sender.rs | 20 +- offchain/rollups-events/src/broker/mod.rs | 33 +++- offchain/rollups-events/tests/integration.rs | 6 +- offchain/test-fixtures/src/broker.rs | 2 +- 12 files changed, 457 insertions(+), 164 deletions(-) create mode 100644 offchain/authority-claimer/src/mock.rs diff --git a/offchain/Cargo.lock b/offchain/Cargo.lock index d21e98800..373f5954e 100644 --- a/offchain/Cargo.lock +++ b/offchain/Cargo.lock @@ -481,15 +481,17 @@ name = "authority-claimer" version = "1.1.0" dependencies = [ "async-trait", + "backoff", "clap", "eth-tx-manager", - "ethers", "http-server", "rollups-events", "rusoto_core", "serde", "serde_json", "snafu", + "test-fixtures", + "testcontainers", "tokio", "tracing", "tracing-subscriber", diff --git a/offchain/advance-runner/src/broker.rs b/offchain/advance-runner/src/broker.rs index 5bde4b681..65f8c677a 100644 --- a/offchain/advance-runner/src/broker.rs +++ b/offchain/advance-runner/src/broker.rs @@ -107,7 +107,7 @@ impl BrokerFacade { loop { let result = self .client - .consume_blocking(&self.inputs_stream, last_id) + .consume_blocking_deprecated(&self.inputs_stream, last_id) .await; if matches!(result, Err(BrokerError::ConsumeTimeout)) { tracing::trace!("consume timed out, retrying"); diff --git a/offchain/authority-claimer/Cargo.toml b/offchain/authority-claimer/Cargo.toml index 919527c11..a31679e12 100644 --- a/offchain/authority-claimer/Cargo.toml +++ b/offchain/authority-claimer/Cargo.toml @@ -16,7 +16,6 @@ rollups-events = { path = "../rollups-events" } async-trait.workspace = true clap = { workspace = true, features = ["derive"] } eth-tx-manager.workspace = true -ethers.workspace = true rusoto_core.workspace = true serde.workspace = true serde_json.workspace = true @@ -24,3 +23,9 @@ snafu.workspace = true tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } tracing-subscriber = { workspace = true, features = ["env-filter"] } tracing.workspace = true + +[dev-dependencies] +test-fixtures = { path = "../test-fixtures" } + +backoff = { workspace = true, features = ["tokio"] } +testcontainers.workspace = true diff --git a/offchain/authority-claimer/src/checker.rs b/offchain/authority-claimer/src/checker.rs index ae375de6d..b9935f93e 100644 --- a/offchain/authority-claimer/src/checker.rs +++ b/offchain/authority-claimer/src/checker.rs @@ -6,11 +6,10 @@ use rollups_events::RollupsClaim; use snafu::Snafu; use std::fmt::Debug; -/// The `DuplicateChecker` checks if a given claim was already submitted -/// to the blockchain. +/// The `DuplicateChecker` checks if a given claim was already submitted to the blockchain. #[async_trait] pub trait DuplicateChecker: Debug { - type Error: snafu::Error; + type Error: snafu::Error + 'static; async fn is_duplicated_rollups_claim( &self, @@ -26,24 +25,24 @@ pub trait DuplicateChecker: Debug { pub struct DefaultDuplicateChecker; #[derive(Debug, Snafu)] -pub enum DefaultDuplicateCheckerError { +pub enum DuplicateCheckerError { Todo, } impl DefaultDuplicateChecker { - pub fn new() -> Result { + pub fn new() -> Result { todo!() } } #[async_trait] impl DuplicateChecker for DefaultDuplicateChecker { - type Error = DefaultDuplicateCheckerError; + type Error = DuplicateCheckerError; async fn is_duplicated_rollups_claim( &self, _rollups_claim: &RollupsClaim, ) -> Result { - todo!() + Err(DuplicateCheckerError::Todo) } } diff --git a/offchain/authority-claimer/src/claimer.rs b/offchain/authority-claimer/src/claimer.rs index fc5c853da..b0e045d99 100644 --- a/offchain/authority-claimer/src/claimer.rs +++ b/offchain/authority-claimer/src/claimer.rs @@ -2,88 +2,80 @@ // SPDX-License-Identifier: Apache-2.0 (see LICENSE) use async_trait::async_trait; +use rollups_events::RollupsClaim; use snafu::ResultExt; +use std::fmt::Debug; use tracing::{info, trace}; -use crate::{ - checker::DuplicateChecker, listener::BrokerListener, - sender::TransactionSender, -}; +use crate::{checker::DuplicateChecker, sender::TransactionSender}; -/// The `AuthorityClaimer` starts an event loop that waits for claim messages -/// from the broker, and then sends the claims to the blockchain. It checks to -/// see if the claim is duplicated before sending. -/// -/// It uses three injected traits, `BrokerListener`, `DuplicateChecker`, and -/// `TransactionSender`, to, respectivelly, listen for messages, check for -/// duplicated claims, and send claims to the blockchain. +/// The `Claimer` sends claims to the blockchain. It checks +/// whether the claim is duplicated before sending. #[async_trait] -pub trait AuthorityClaimer { - async fn start( - &self, - broker_listener: L, - duplicate_checker: C, - transaction_sender: S, - ) -> Result<(), AuthorityClaimerError> - where - L: BrokerListener + Send + Sync, - C: DuplicateChecker + Send + Sync, - S: TransactionSender + Send, - { - trace!("Starting the authority claimer loop"); - let mut transaction_sender = transaction_sender; - loop { - let rollups_claim = broker_listener - .listen() - .await - .context(BrokerListenerSnafu)?; - trace!("Got a claim from the broker: {:?}", rollups_claim); +pub trait Claimer: Sized + Debug { + type Error: snafu::Error + 'static; - let is_duplicated_rollups_claim = duplicate_checker - .is_duplicated_rollups_claim(&rollups_claim) - .await - .context(DuplicateCheckerSnafu)?; - if is_duplicated_rollups_claim { - trace!("It was a duplicated claim"); - continue; - } - - info!("Sending a new rollups claim"); - transaction_sender = transaction_sender - .send_rollups_claim(rollups_claim) - .await - .context(TransactionSenderSnafu)? - } - } + async fn send_rollups_claim( + self, + rollups_claim: RollupsClaim, + ) -> Result; } #[derive(Debug, snafu::Snafu)] -pub enum AuthorityClaimerError< - L: BrokerListener + 'static, - C: DuplicateChecker + 'static, - S: TransactionSender + 'static, -> { - #[snafu(display("broker listener error"))] - BrokerListenerError { source: L::Error }, - - #[snafu(display("duplicate checker error"))] - DuplicateCheckerError { source: C::Error }, +pub enum ClaimerError { + #[snafu(display("duplicated claim error"))] + DuplicatedClaimError { source: D::Error }, #[snafu(display("transaction sender error"))] - TransactionSenderError { source: S::Error }, + TransactionSenderError { source: T::Error }, } -// ------------------------------------------------------------------------------------------------ -// DefaultAuthorityClaimer -// ------------------------------------------------------------------------------------------------ - -#[derive(Default)] -pub struct DefaultAuthorityClaimer; +/// The `AbstractClaimer` must be injected with a +/// `DuplicateChecker` and a `TransactionSender`. +#[derive(Debug)] +pub struct AbstractClaimer { + duplicate_checker: D, + transaction_sender: T, +} -impl DefaultAuthorityClaimer { - pub fn new() -> Self { - Self +impl AbstractClaimer { + pub fn new(duplicate_checker: D, transaction_sender: T) -> Self { + Self { + duplicate_checker, + transaction_sender, + } } } -impl AuthorityClaimer for DefaultAuthorityClaimer {} +#[async_trait] +impl Claimer for AbstractClaimer +where + D: DuplicateChecker + Send + Sync + 'static, + T: TransactionSender + Send + 'static, +{ + type Error = ClaimerError; + + async fn send_rollups_claim( + mut self, + rollups_claim: RollupsClaim, + ) -> Result { + let is_duplicated_rollups_claim = self + .duplicate_checker + .is_duplicated_rollups_claim(&rollups_claim) + .await + .context(DuplicatedClaimSnafu)?; + if is_duplicated_rollups_claim { + trace!("It was a duplicated claim"); + return Ok(self); + } + + info!("Sending a new rollups claim"); + self.transaction_sender = self + .transaction_sender + .send_rollups_claim_transaction(rollups_claim) + .await + .context(TransactionSenderSnafu)?; + + Ok(self) + } +} diff --git a/offchain/authority-claimer/src/lib.rs b/offchain/authority-claimer/src/lib.rs index fbab977a2..1571f6c57 100644 --- a/offchain/authority-claimer/src/lib.rs +++ b/offchain/authority-claimer/src/lib.rs @@ -8,6 +8,9 @@ pub mod listener; pub mod metrics; pub mod sender; +#[cfg(test)] +mod mock; + use config::Config; use rollups_events::DAppMetadata; use snafu::Error; @@ -15,57 +18,59 @@ use tracing::trace; use crate::{ checker::DefaultDuplicateChecker, - claimer::{AuthorityClaimer, DefaultAuthorityClaimer}, - listener::DefaultBrokerListener, + claimer::AbstractClaimer, + listener::{BrokerListener, DefaultBrokerListener}, metrics::AuthorityClaimerMetrics, sender::DefaultTransactionSender, }; pub async fn run(config: Config) -> Result<(), Box> { - tracing::info!(?config, "starting authority-claimer"); + tracing::info!(?config, "Starting the authority-claimer"); // Creating the metrics and health server. let metrics = AuthorityClaimerMetrics::new(); let http_server_handle = http_server::start(config.http_server_config, metrics.clone().into()); - let dapp_address = config.authority_claimer_config.dapp_address; - let dapp_metadata = DAppMetadata { - chain_id: config.authority_claimer_config.tx_manager_config.chain_id, - dapp_address, - }; + let claimer_handle = { + let config = config.authority_claimer_config; + + let dapp_address = config.dapp_address; + let dapp_metadata = DAppMetadata { + chain_id: config.tx_manager_config.chain_id, + dapp_address, + }; - // Creating the broker listener. - trace!("Creating the broker listener"); - let broker_listener = DefaultBrokerListener::new( - config.authority_claimer_config.broker_config.clone(), - dapp_metadata.clone(), - metrics.clone(), - ) - .map_err(Box::new)?; + // Creating the duplicate checker. + trace!("Creating the duplicate checker"); + let duplicate_checker = DefaultDuplicateChecker::new()?; - // Creating the duplicate checker. - trace!("Creating the duplicate checker"); - let duplicate_checker = DefaultDuplicateChecker::new().map_err(Box::new)?; + // Creating the transaction sender. + trace!("Creating the transaction sender"); + let transaction_sender = DefaultTransactionSender::new( + dapp_metadata.clone(), + metrics.clone(), + )?; - // Creating the transaction sender. - trace!("Creating the transaction sender"); - let transaction_sender = - DefaultTransactionSender::new(dapp_metadata, metrics) - .map_err(Box::new)?; + // Creating the broker listener. + trace!("Creating the broker listener"); + let broker_listener = + DefaultBrokerListener::new(config.broker_config, dapp_metadata) + .await?; - // Creating the claimer loop. - let authority_claimer = DefaultAuthorityClaimer::new(); - let claimer_handle = authority_claimer.start( - broker_listener, - duplicate_checker, - transaction_sender, - ); + // Creating the claimer. + trace!("Creating the claimer"); + let claimer = + AbstractClaimer::new(duplicate_checker, transaction_sender); + + // Returning the claimer event loop. + broker_listener.start(claimer) + }; // Starting the HTTP server and the claimer loop. tokio::select! { - ret = http_server_handle => { ret.map_err(Box::new)? } - ret = claimer_handle => { ret.map_err(Box::new)? } + ret = http_server_handle => { ret? } + ret = claimer_handle => { ret? } }; unreachable!() diff --git a/offchain/authority-claimer/src/listener.rs b/offchain/authority-claimer/src/listener.rs index 1e676472f..29dd2c98c 100644 --- a/offchain/authority-claimer/src/listener.rs +++ b/offchain/authority-claimer/src/listener.rs @@ -2,21 +2,24 @@ // SPDX-License-Identifier: Apache-2.0 (see LICENSE) use async_trait::async_trait; -use rollups_events::{BrokerConfig, DAppMetadata, RollupsClaim}; -use snafu::Snafu; +use rollups_events::{ + Broker, BrokerConfig, BrokerError, DAppMetadata, RollupsClaimsStream, + INITIAL_ID, +}; +use snafu::ResultExt; use std::fmt::Debug; +use tracing::trace; -use crate::metrics::AuthorityClaimerMetrics; +use crate::claimer::Claimer; -/// The `BrokerListener` listens for new claims from the broker. -/// -/// The `listen` function should preferably yield to other processes while -/// waiting for new messages (instead of busy-waiting). +/// The `BrokerListener` starts a perpetual loop that listens for new claims from +/// the broker and sends them to be processed by the injected `Claimer`. #[async_trait] -pub trait BrokerListener: Debug { - type Error: snafu::Error; +pub trait BrokerListener { + type Error: snafu::Error + 'static; - async fn listen(&self) -> Result; + /// Starts the polling loop. + async fn start(self, claimer: C) -> Result<(), Self::Error>; } // ------------------------------------------------------------------------------------------------ @@ -24,39 +27,164 @@ pub trait BrokerListener: Debug { // ------------------------------------------------------------------------------------------------ #[derive(Debug)] -pub struct DefaultBrokerListener; +pub struct DefaultBrokerListener { + broker: Broker, + stream: RollupsClaimsStream, + last_claim_id: String, +} + +#[derive(Debug, snafu::Snafu)] +pub enum BrokerListenerError { + #[snafu(display("broker error"))] + BrokerError { source: BrokerError }, -#[derive(Debug, Snafu)] -pub enum DefaultBrokerListenerError { - Todo, + #[snafu(display("claimer error"))] + ClaimerError { source: C::Error }, } impl DefaultBrokerListener { - pub fn new( - _broker_config: BrokerConfig, - _dapp_metadata: DAppMetadata, - _metrics: AuthorityClaimerMetrics, - ) -> Result { - todo!() + pub async fn new( + broker_config: BrokerConfig, + dapp_metadata: DAppMetadata, + ) -> Result { + tracing::trace!("Connecting to the broker ({:?})", broker_config); + let broker = Broker::new(broker_config).await?; + let stream = RollupsClaimsStream::new(&dapp_metadata); + let last_claim_id = INITIAL_ID.to_string(); + Ok(Self { + broker, + stream, + last_claim_id, + }) } } #[async_trait] -impl BrokerListener for DefaultBrokerListener { - type Error = DefaultBrokerListenerError; +impl BrokerListener for DefaultBrokerListener +where + C: Claimer + Send + 'static, +{ + type Error = BrokerListenerError; + + async fn start(mut self, mut claimer: C) -> Result<(), Self::Error> { + trace!("Starting the event loop"); + loop { + tracing::trace!("Waiting for claim with id {}", self.last_claim_id); + let event = self + .broker + .consume_blocking(&self.stream, &self.last_claim_id) + .await + .context(BrokerSnafu)?; + + let rollups_claim = event.payload.clone(); + trace!("Got a claim from the broker: {:?}", rollups_claim); + claimer = claimer + .send_rollups_claim(rollups_claim) + .await + .context(ClaimerSnafu)?; + tracing::trace!("Consumed event {:?}", event); - async fn listen(&self) -> Result { - todo!() + self.last_claim_id = event.id; + } } } -// impl Stream for BrokerListener { -// type Item = u32; -// -// fn poll_next( -// self: Pin<&mut Self>, -// cx: &mut Context<'_>, -// ) -> Poll> { -// todo!() -// } -// } +#[cfg(test)] +mod tests { + use std::time::Duration; + use testcontainers::clients::Cli; + + use test_fixtures::BrokerFixture; + + use crate::{ + listener::{BrokerListener, DefaultBrokerListener}, + mock, + }; + + async fn setup(docker: &Cli) -> (BrokerFixture, DefaultBrokerListener) { + mock::setup_broker(docker, false).await.unwrap() + } + + #[tokio::test] + async fn instantiate_new_broker_listener_ok() { + let docker = Cli::default(); + let _ = setup(&docker).await; + } + + #[tokio::test] + async fn instantiate_new_broker_listener_error() { + let docker = Cli::default(); + let result = mock::setup_broker(&docker, true).await; + assert!(result.is_err(), "setup_broker didn't fail as it should"); + let error = result.err().unwrap().to_string(); + assert_eq!(error, "error connecting to Redis"); + } + + #[tokio::test] + async fn start_broker_listener_with_claims_enqueued() { + let docker = Cli::default(); + let (fixture, broker) = setup(&docker).await; + let n = 5; + let claimer = mock::Claimer::new(n); + mock::produce_rollups_claims(&fixture, n, 0).await; + mock::produce_last_claim(&fixture, n).await; + let result = broker.start(claimer).await; + mock::assert_broker_listener_ended(result); + } + + #[tokio::test] + async fn start_broker_listener_with_no_claims_enqueued() { + let docker = Cli::default(); + let (fixture, broker) = setup(&docker).await; + let n = 7; + let claimer = mock::Claimer::new(n); + + let broker_listener = tokio::spawn(async move { + println!("Spawned the broker-listener thread."); + let result = broker.start(claimer).await; + mock::assert_broker_listener_ended(result); + }); + + println!("Going to sleep for 1 second."); + tokio::time::sleep(Duration::from_secs(1)).await; + + let x = 2; + println!("Creating {} claims.", x); + mock::produce_rollups_claims(&fixture, x, 0).await; + + println!("Going to sleep for 2 seconds."); + tokio::time::sleep(Duration::from_secs(2)).await; + + let y = 5; + println!("Creating {} claims.", y); + mock::produce_rollups_claims(&fixture, y, x).await; + + assert_eq!(x + y, n); + mock::produce_last_claim(&fixture, n).await; + + broker_listener.await.unwrap(); + } + + #[tokio::test] + async fn start_broker_listener_and_fail_without_consuming_claims() { + let docker = Cli::default(); + let (fixture, broker) = setup(&docker).await; + let n = 0; + let claimer = mock::Claimer::new_with_error(n); + mock::produce_last_claim(&fixture, n).await; + let result = broker.start(claimer).await; + mock::assert_broker_listener_failed(result); + } + + #[tokio::test] + async fn start_broker_listener_and_fail_after_consuming_some_claims() { + let docker = Cli::default(); + let (fixture, broker) = setup(&docker).await; + let n = 5; + let claimer = mock::Claimer::new_with_error(n); + mock::produce_rollups_claims(&fixture, n, 0).await; + mock::produce_last_claim(&fixture, n).await; + let result = broker.start(claimer).await; + mock::assert_broker_listener_failed(result); + } +} diff --git a/offchain/authority-claimer/src/mock.rs b/offchain/authority-claimer/src/mock.rs new file mode 100644 index 000000000..a52b25c72 --- /dev/null +++ b/offchain/authority-claimer/src/mock.rs @@ -0,0 +1,139 @@ +use std::time::Duration; + +use async_trait::async_trait; +use backoff::ExponentialBackoffBuilder; +use rollups_events::{ + BrokerConfig, BrokerEndpoint, BrokerError, DAppMetadata, RedactedUrl, + RollupsClaim, Url, +}; +use snafu::{OptionExt, Snafu}; +use test_fixtures::BrokerFixture; +use testcontainers::clients::Cli; + +use crate::{ + claimer, + listener::{BrokerListenerError, DefaultBrokerListener}, +}; + +#[derive(Debug)] +pub struct Claimer { + results: Vec>, +} + +#[derive(Clone, Debug, Snafu)] +pub enum ClaimerError { + EndError, + InternalError, + MockError, +} + +impl Claimer { + /// Creates a `Claimer` that proccesses `n` claims before returning + /// the `ClaimerError::EndError` error. + pub fn new(n: usize) -> Self { + let mut results: Vec> = vec![Ok(()); n]; + results.insert(0, Err(ClaimerError::EndError)); + Self { results } + } + + /// Creates a `Claimer` that proccesses `n` claims before returning + /// the `ClaimerError::MockError` error. + pub fn new_with_error(n: usize) -> Self { + let mut results: Vec> = vec![Ok(()); n]; + results.insert(0, Err(ClaimerError::MockError)); + Self { results } + } +} + +#[async_trait] +impl claimer::Claimer for Claimer { + type Error = ClaimerError; + + async fn send_rollups_claim( + mut self, + _: RollupsClaim, + ) -> Result { + let length = self.results.len() - 1; + println!("The mock claimer is consuming claim {}.", length); + self.results.pop().context(InternalSnafu)?.map(|_| self) + } +} + +pub fn assert_broker_listener_ended( + result: Result<(), BrokerListenerError>, +) { + assert!(result.is_err()); + match result { + Ok(_) => panic!("broker listener returned with Ok(())"), + Err(BrokerListenerError::ClaimerError { source }) => { + assert_eq!(source.to_string(), ClaimerError::EndError.to_string()) + } + Err(err) => panic!("broker listener failed with error {:?}", err), + } +} + +pub fn assert_broker_listener_failed( + result: Result<(), BrokerListenerError>, +) { + assert!(result.is_err()); + match result { + Ok(_) => panic!("broker listener returned with Ok(())"), + Err(BrokerListenerError::ClaimerError { source }) => { + assert_eq!(source.to_string(), ClaimerError::MockError.to_string()) + } + Err(err) => panic!("broker listener failed with error {:?}", err), + } +} + +pub async fn setup_broker( + docker: &Cli, + should_fail: bool, +) -> Result<(BrokerFixture, DefaultBrokerListener), BrokerError> { + let fixture = BrokerFixture::setup(docker).await; + + let redis_endpoint = if should_fail { + BrokerEndpoint::Single(RedactedUrl::new( + Url::parse("https://invalid.com").unwrap(), + )) + } else { + fixture.redis_endpoint().clone() + }; + + let config = BrokerConfig { + redis_endpoint, + consume_timeout: 300000, + backoff: ExponentialBackoffBuilder::new() + .with_initial_interval(Duration::from_millis(1000)) + .with_max_elapsed_time(Some(Duration::from_millis(3000))) + .build(), + }; + let metadata = DAppMetadata { + chain_id: fixture.chain_id(), + dapp_address: fixture.dapp_address().clone(), + }; + let broker = DefaultBrokerListener::new(config, metadata).await?; + Ok((fixture, broker)) +} + +pub async fn produce_rollups_claims( + fixture: &BrokerFixture<'_>, + n: usize, + epoch_index_start: usize, +) -> Vec { + let mut rollups_claims = Vec::new(); + for i in 0..n { + let mut rollups_claim = RollupsClaim::default(); + rollups_claim.epoch_index = (i + epoch_index_start) as u64; + fixture.produce_rollups_claim(rollups_claim.clone()).await; + rollups_claims.push(rollups_claim); + } + rollups_claims +} + +/// The last claim should trigger the `ClaimerError::EndError` error. +pub async fn produce_last_claim( + fixture: &BrokerFixture<'_>, + epoch_index: usize, +) -> Vec { + produce_rollups_claims(fixture, 1, epoch_index).await +} diff --git a/offchain/authority-claimer/src/sender.rs b/offchain/authority-claimer/src/sender.rs index e3816eaf9..5a16bc9db 100644 --- a/offchain/authority-claimer/src/sender.rs +++ b/offchain/authority-claimer/src/sender.rs @@ -13,12 +13,12 @@ use crate::metrics::AuthorityClaimerMetrics; /// It should wait for N blockchain confirmations. #[async_trait] pub trait TransactionSender: Sized + Debug { - type Error: snafu::Error; + type Error: snafu::Error + 'static; - /// The `send_rollups_claim` function consumes the `TransactionSender` - /// object and then returns it to avoid that processes use the transaction - /// sender concurrently. - async fn send_rollups_claim( + /// The `send_rollups_claim_transaction` function consumes the + /// `TransactionSender` object and then returns it to avoid + /// that processes use the transaction sender concurrently. + async fn send_rollups_claim_transaction( self, rollups_claim: RollupsClaim, ) -> Result; @@ -32,7 +32,7 @@ pub trait TransactionSender: Sized + Debug { pub struct DefaultTransactionSender; #[derive(Debug, Snafu)] -pub enum DefaultTransactionSenderError { +pub enum TransactionSenderError { Todo, } @@ -40,19 +40,19 @@ impl DefaultTransactionSender { pub fn new( _dapp_metadata: DAppMetadata, _metrics: AuthorityClaimerMetrics, - ) -> Result { + ) -> Result { todo!() } } #[async_trait] impl TransactionSender for DefaultTransactionSender { - type Error = DefaultTransactionSenderError; + type Error = TransactionSenderError; - async fn send_rollups_claim( + async fn send_rollups_claim_transaction( self, _rollups_claim: RollupsClaim, ) -> Result { - todo!() + Err(TransactionSenderError::Todo) } } diff --git a/offchain/rollups-events/src/broker/mod.rs b/offchain/rollups-events/src/broker/mod.rs index 0108327ca..3d27c8c9d 100644 --- a/offchain/rollups-events/src/broker/mod.rs +++ b/offchain/rollups-events/src/broker/mod.rs @@ -184,11 +184,9 @@ impl Broker { } } - /// Consume the next event in stream - /// This function blocks until a new event is available. - /// To consume the first event in the stream, last_consumed_id should be INITIAL_ID. + /// Deprecated, use `consume_blocking` instead #[tracing::instrument(level = "trace", skip_all)] - pub async fn consume_blocking( + pub async fn consume_blocking_deprecated( &mut self, stream: &S, last_consumed_id: &str, @@ -223,9 +221,34 @@ impl Broker { event.try_into() } + /// Consume the next event in stream + /// + /// This function blocks until a new event is available, + /// and it retries timeouts instead of returning an error. + /// + /// To consume the first event in the stream, `last_consumed_id` should be `INITIAL_ID`. + #[tracing::instrument(level = "trace", skip_all)] + pub async fn consume_blocking( + &mut self, + stream: &S, + last_consumed_id: &str, + ) -> Result, BrokerError> { + loop { + let result = self + .consume_blocking_deprecated(stream, last_consumed_id) + .await; + + if let Err(BrokerError::ConsumeTimeout) = result { + tracing::trace!("consume timed out, retrying"); + } else { + return result; + } + } + } + /// Consume the next event in stream without blocking /// This function returns None if there are no more remaining events. - /// To consume the first event in the stream, last_consumed_id should be INITIAL_ID. + /// To consume the first event in the stream, `last_consumed_id` should be `INITIAL_ID`. #[tracing::instrument(level = "trace", skip_all)] pub async fn consume_nonblocking( &mut self, diff --git a/offchain/rollups-events/tests/integration.rs b/offchain/rollups-events/tests/integration.rs index 316ef188a..b73c3b035 100644 --- a/offchain/rollups-events/tests/integration.rs +++ b/offchain/rollups-events/tests/integration.rs @@ -209,7 +209,7 @@ async fn test_it_consumes_events() { let mut last_id = INITIAL_ID.to_owned(); for i in 0..N { let event = broker - .consume_blocking(&MockStream {}, &last_id) + .consume_blocking_deprecated(&MockStream {}, &last_id) .await .expect("failed to consume"); assert_eq!(event.id, format!("1-{}", i)); @@ -237,7 +237,7 @@ async fn test_it_blocks_until_event_is_produced() { // In the main thread, wait for the expected event let mut broker = state.create_broker().await; let event = broker - .consume_blocking(&MockStream {}, "0") + .consume_blocking_deprecated(&MockStream {}, "0") .await .expect("failed to consume event"); assert_eq!(event.id, "1-0"); @@ -293,7 +293,7 @@ async fn test_it_times_out_when_no_event_is_produced() { let state = TestState::setup(&docker).await; let mut broker = state.create_broker().await; let err = broker - .consume_blocking(&MockStream {}, "0") + .consume_blocking_deprecated(&MockStream {}, "0") .await .expect_err("consume event worked but it should have failed"); assert!(matches!(err, BrokerError::ConsumeTimeout)); diff --git a/offchain/test-fixtures/src/broker.rs b/offchain/test-fixtures/src/broker.rs index 081adfaab..36ec162d0 100644 --- a/offchain/test-fixtures/src/broker.rs +++ b/offchain/test-fixtures/src/broker.rs @@ -225,7 +225,7 @@ impl BrokerFixture<'_> { .client .lock() .await - .consume_blocking(&self.claims_stream, &last_id) + .consume_blocking_deprecated(&self.claims_stream, &last_id) .await .expect("failed to consume claim"); claims.push(event.payload);