diff --git a/offchain/Cargo.lock b/offchain/Cargo.lock index c22a72032..58a37048a 100644 --- a/offchain/Cargo.lock +++ b/offchain/Cargo.lock @@ -485,6 +485,7 @@ name = "authority-claimer" version = "1.1.0" dependencies = [ "async-trait", + "backoff", "clap", "eth-tx-manager", "http-server", @@ -494,6 +495,8 @@ dependencies = [ "serde", "serde_json", "snafu", + "test-fixtures", + "testcontainers", "tokio", "tracing", ] diff --git a/offchain/advance-runner/src/broker.rs b/offchain/advance-runner/src/broker.rs index 5bde4b681..c9f3c6cff 100644 --- a/offchain/advance-runner/src/broker.rs +++ b/offchain/advance-runner/src/broker.rs @@ -103,18 +103,11 @@ impl BrokerFacade { last_id: &str, ) -> Result> { tracing::trace!(last_id, "consuming rollups input event"); - - loop { - let result = self - .client - .consume_blocking(&self.inputs_stream, last_id) - .await; - if matches!(result, Err(BrokerError::ConsumeTimeout)) { - tracing::trace!("consume timed out, retrying"); - } else { - return result.context(BrokerInternalSnafu); - } - } + let result = self + .client + .consume_blocking(&self.inputs_stream, last_id) + .await; + return result.context(BrokerInternalSnafu); } /// Produce the rollups claim if it isn't in the stream yet diff --git a/offchain/authority-claimer/Cargo.toml b/offchain/authority-claimer/Cargo.toml index d27b30203..c6cdd72b9 100644 --- a/offchain/authority-claimer/Cargo.toml +++ b/offchain/authority-claimer/Cargo.toml @@ -23,3 +23,9 @@ serde_json.workspace = true snafu.workspace = true tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } tracing.workspace = true + +[dev-dependencies] +test-fixtures = { path = "../test-fixtures" } + +backoff = { workspace = true, features = ["tokio"] } +testcontainers.workspace = true diff --git a/offchain/authority-claimer/src/broker_mock.rs b/offchain/authority-claimer/src/broker_mock.rs new file mode 100644 index 000000000..42da4450b --- /dev/null +++ b/offchain/authority-claimer/src/broker_mock.rs @@ -0,0 +1,75 @@ +// (c) Cartesi and individual authors (see AUTHORS) +// SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +use std::time::Duration; + +use backoff::ExponentialBackoffBuilder; +use rollups_events::{ + BrokerConfig, BrokerEndpoint, BrokerError, DAppMetadata, RedactedUrl, + RollupsClaim, Url, +}; +use snafu::Snafu; +use test_fixtures::BrokerFixture; +use testcontainers::clients::Cli; + +use crate::listener::DefaultBrokerListener; + +#[derive(Clone, Debug, Snafu)] +pub enum MockError { + EndError, + InternalError, + MockError, +} + +pub async fn setup_broker( + docker: &Cli, + should_fail: bool, +) -> Result<(BrokerFixture, DefaultBrokerListener), BrokerError> { + let fixture = BrokerFixture::setup(docker).await; + + let redis_endpoint = if should_fail { + BrokerEndpoint::Single(RedactedUrl::new( + Url::parse("https://invalid.com").unwrap(), + )) + } else { + fixture.redis_endpoint().clone() + }; + + let config = BrokerConfig { + redis_endpoint, + consume_timeout: 300000, + backoff: ExponentialBackoffBuilder::new() + .with_initial_interval(Duration::from_millis(1000)) + .with_max_elapsed_time(Some(Duration::from_millis(3000))) + .build(), + }; + let metadata = DAppMetadata { + chain_id: fixture.chain_id(), + dapp_address: fixture.dapp_address().clone(), + }; + let broker = DefaultBrokerListener::new(config, metadata).await?; + Ok((fixture, broker)) +} + +pub async fn produce_rollups_claims( + fixture: &BrokerFixture<'_>, + n: usize, + epoch_index_start: usize, +) -> Vec { + let mut rollups_claims = Vec::new(); + for i in 0..n { + let mut rollups_claim = RollupsClaim::default(); + rollups_claim.epoch_index = (i + epoch_index_start) as u64; + fixture.produce_rollups_claim(rollups_claim.clone()).await; + rollups_claims.push(rollups_claim); + } + rollups_claims +} + +/// The last claim should trigger an `EndError` error. +pub async fn produce_last_claim( + fixture: &BrokerFixture<'_>, + epoch_index: usize, +) -> Vec { + produce_rollups_claims(fixture, 1, epoch_index).await +} diff --git a/offchain/authority-claimer/src/checker.rs b/offchain/authority-claimer/src/checker.rs index ae375de6d..b9935f93e 100644 --- a/offchain/authority-claimer/src/checker.rs +++ b/offchain/authority-claimer/src/checker.rs @@ -6,11 +6,10 @@ use rollups_events::RollupsClaim; use snafu::Snafu; use std::fmt::Debug; -/// The `DuplicateChecker` checks if a given claim was already submitted -/// to the blockchain. +/// The `DuplicateChecker` checks if a given claim was already submitted to the blockchain. #[async_trait] pub trait DuplicateChecker: Debug { - type Error: snafu::Error; + type Error: snafu::Error + 'static; async fn is_duplicated_rollups_claim( &self, @@ -26,24 +25,24 @@ pub trait DuplicateChecker: Debug { pub struct DefaultDuplicateChecker; #[derive(Debug, Snafu)] -pub enum DefaultDuplicateCheckerError { +pub enum DuplicateCheckerError { Todo, } impl DefaultDuplicateChecker { - pub fn new() -> Result { + pub fn new() -> Result { todo!() } } #[async_trait] impl DuplicateChecker for DefaultDuplicateChecker { - type Error = DefaultDuplicateCheckerError; + type Error = DuplicateCheckerError; async fn is_duplicated_rollups_claim( &self, _rollups_claim: &RollupsClaim, ) -> Result { - todo!() + Err(DuplicateCheckerError::Todo) } } diff --git a/offchain/authority-claimer/src/claimer.rs b/offchain/authority-claimer/src/claimer.rs index fc5c853da..7e3596c5b 100644 --- a/offchain/authority-claimer/src/claimer.rs +++ b/offchain/authority-claimer/src/claimer.rs @@ -3,6 +3,7 @@ use async_trait::async_trait; use snafu::ResultExt; +use std::fmt::Debug; use tracing::{info, trace}; use crate::{ @@ -10,7 +11,7 @@ use crate::{ sender::TransactionSender, }; -/// The `AuthorityClaimer` starts an event loop that waits for claim messages +/// The `Claimer` starts an event loop that waits for claim messages /// from the broker, and then sends the claims to the blockchain. It checks to /// see if the claim is duplicated before sending. /// @@ -18,72 +19,96 @@ use crate::{ /// `TransactionSender`, to, respectivelly, listen for messages, check for /// duplicated claims, and send claims to the blockchain. #[async_trait] -pub trait AuthorityClaimer { - async fn start( - &self, - broker_listener: L, - duplicate_checker: C, - transaction_sender: S, - ) -> Result<(), AuthorityClaimerError> - where - L: BrokerListener + Send + Sync, - C: DuplicateChecker + Send + Sync, - S: TransactionSender + Send, - { +pub trait Claimer: Sized + Debug { + type Error: snafu::Error + 'static; + + async fn start(mut self) -> Result<(), Self::Error>; +} + +#[derive(Debug, snafu::Snafu)] +pub enum ClaimerError< + B: BrokerListener, + D: DuplicateChecker, + T: TransactionSender, +> { + #[snafu(display("duplicated claim error"))] + BrokerListenerError { source: B::Error }, + + #[snafu(display("duplicated claim error"))] + DuplicatedClaimError { source: D::Error }, + + #[snafu(display("transaction sender error"))] + TransactionSenderError { source: T::Error }, +} + +// ------------------------------------------------------------------------------------------------ +// DefaultClaimer +// ------------------------------------------------------------------------------------------------ + +/// The `DefaultClaimer` must be injected with a +/// `BrokerListener`, a `DuplicateChecker` and a `TransactionSender`. +#[derive(Debug)] +pub struct DefaultClaimer< + B: BrokerListener, + D: DuplicateChecker, + T: TransactionSender, +> { + broker_listener: B, + duplicate_checker: D, + transaction_sender: T, +} + +impl + DefaultClaimer +{ + pub fn new( + broker_listener: B, + duplicate_checker: D, + transaction_sender: T, + ) -> Self { + Self { + broker_listener, + duplicate_checker, + transaction_sender, + } + } +} + +#[async_trait] +impl Claimer for DefaultClaimer +where + B: BrokerListener + Send + Sync + 'static, + D: DuplicateChecker + Send + Sync + 'static, + T: TransactionSender + Send + 'static, +{ + type Error = ClaimerError; + + async fn start(mut self) -> Result<(), Self::Error> { trace!("Starting the authority claimer loop"); - let mut transaction_sender = transaction_sender; loop { - let rollups_claim = broker_listener + let rollups_claim = self + .broker_listener .listen() .await .context(BrokerListenerSnafu)?; trace!("Got a claim from the broker: {:?}", rollups_claim); - let is_duplicated_rollups_claim = duplicate_checker + let is_duplicated_rollups_claim = self + .duplicate_checker .is_duplicated_rollups_claim(&rollups_claim) .await - .context(DuplicateCheckerSnafu)?; + .context(DuplicatedClaimSnafu)?; if is_duplicated_rollups_claim { trace!("It was a duplicated claim"); continue; } info!("Sending a new rollups claim"); - transaction_sender = transaction_sender - .send_rollups_claim(rollups_claim) + self.transaction_sender = self + .transaction_sender + .send_rollups_claim_transaction(rollups_claim) .await .context(TransactionSenderSnafu)? } } } - -#[derive(Debug, snafu::Snafu)] -pub enum AuthorityClaimerError< - L: BrokerListener + 'static, - C: DuplicateChecker + 'static, - S: TransactionSender + 'static, -> { - #[snafu(display("broker listener error"))] - BrokerListenerError { source: L::Error }, - - #[snafu(display("duplicate checker error"))] - DuplicateCheckerError { source: C::Error }, - - #[snafu(display("transaction sender error"))] - TransactionSenderError { source: S::Error }, -} - -// ------------------------------------------------------------------------------------------------ -// DefaultAuthorityClaimer -// ------------------------------------------------------------------------------------------------ - -#[derive(Default)] -pub struct DefaultAuthorityClaimer; - -impl DefaultAuthorityClaimer { - pub fn new() -> Self { - Self - } -} - -impl AuthorityClaimer for DefaultAuthorityClaimer {} diff --git a/offchain/authority-claimer/src/lib.rs b/offchain/authority-claimer/src/lib.rs index cb4002a24..21cd432c9 100644 --- a/offchain/authority-claimer/src/lib.rs +++ b/offchain/authority-claimer/src/lib.rs @@ -8,6 +8,9 @@ pub mod listener; pub mod metrics; pub mod sender; +#[cfg(test)] +mod broker_mock; + use config::Config; use rollups_events::DAppMetadata; use snafu::Error; @@ -15,7 +18,7 @@ use tracing::trace; use crate::{ checker::DefaultDuplicateChecker, - claimer::{AuthorityClaimer, DefaultAuthorityClaimer}, + claimer::{Claimer, DefaultClaimer}, listener::DefaultBrokerListener, metrics::AuthorityClaimerMetrics, sender::DefaultTransactionSender, @@ -27,43 +30,40 @@ pub async fn run(config: Config) -> Result<(), Box> { let http_server_handle = http_server::start(config.http_server_config, metrics.clone().into()); - let dapp_address = config.authority_claimer_config.dapp_address; + let config = config.authority_claimer_config; + let dapp_address = config.dapp_address; let dapp_metadata = DAppMetadata { - chain_id: config.authority_claimer_config.tx_manager_config.chain_id, + chain_id: config.tx_manager_config.chain_id, dapp_address, }; // Creating the broker listener. trace!("Creating the broker listener"); - let broker_listener = DefaultBrokerListener::new( - config.authority_claimer_config.broker_config.clone(), - dapp_metadata.clone(), - metrics.clone(), - ) - .map_err(Box::new)?; + let broker_listener = + DefaultBrokerListener::new(config.broker_config, dapp_metadata.clone()) + .await?; // Creating the duplicate checker. trace!("Creating the duplicate checker"); - let duplicate_checker = DefaultDuplicateChecker::new().map_err(Box::new)?; + let duplicate_checker = DefaultDuplicateChecker::new()?; // Creating the transaction sender. trace!("Creating the transaction sender"); let transaction_sender = - DefaultTransactionSender::new(dapp_metadata, metrics) - .map_err(Box::new)?; + DefaultTransactionSender::new(dapp_metadata, metrics)?; // Creating the claimer loop. - let authority_claimer = DefaultAuthorityClaimer::new(); - let claimer_handle = authority_claimer.start( + let claimer = DefaultClaimer::new( broker_listener, duplicate_checker, transaction_sender, ); + let claimer_handle = claimer.start(); // Starting the HTTP server and the claimer loop. tokio::select! { - ret = http_server_handle => { ret.map_err(Box::new)? } - ret = claimer_handle => { ret.map_err(Box::new)? } + ret = http_server_handle => { ret? } + ret = claimer_handle => { ret? } }; unreachable!() diff --git a/offchain/authority-claimer/src/listener.rs b/offchain/authority-claimer/src/listener.rs index 1e676472f..eb4b04bae 100644 --- a/offchain/authority-claimer/src/listener.rs +++ b/offchain/authority-claimer/src/listener.rs @@ -2,21 +2,20 @@ // SPDX-License-Identifier: Apache-2.0 (see LICENSE) use async_trait::async_trait; -use rollups_events::{BrokerConfig, DAppMetadata, RollupsClaim}; -use snafu::Snafu; +use rollups_events::{ + Broker, BrokerConfig, BrokerError, DAppMetadata, RollupsClaim, + RollupsClaimsStream, INITIAL_ID, +}; +use snafu::ResultExt; use std::fmt::Debug; -use crate::metrics::AuthorityClaimerMetrics; - -/// The `BrokerListener` listens for new claims from the broker. -/// -/// The `listen` function should preferably yield to other processes while -/// waiting for new messages (instead of busy-waiting). +/// The `BrokerListener` listens for new claims from the broker #[async_trait] pub trait BrokerListener: Debug { - type Error: snafu::Error; + type Error: snafu::Error + 'static; - async fn listen(&self) -> Result; + /// Listen to claims + async fn listen(&mut self) -> Result; } // ------------------------------------------------------------------------------------------------ @@ -24,39 +23,131 @@ pub trait BrokerListener: Debug { // ------------------------------------------------------------------------------------------------ #[derive(Debug)] -pub struct DefaultBrokerListener; +pub struct DefaultBrokerListener { + broker: Broker, + stream: RollupsClaimsStream, + last_claim_id: String, +} -#[derive(Debug, Snafu)] -pub enum DefaultBrokerListenerError { - Todo, +#[derive(Debug, snafu::Snafu)] +pub enum BrokerListenerError { + #[snafu(display("broker error"))] + BrokerError { source: BrokerError }, } impl DefaultBrokerListener { - pub fn new( - _broker_config: BrokerConfig, - _dapp_metadata: DAppMetadata, - _metrics: AuthorityClaimerMetrics, - ) -> Result { - todo!() + pub async fn new( + broker_config: BrokerConfig, + dapp_metadata: DAppMetadata, + ) -> Result { + tracing::trace!("Connecting to the broker ({:?})", broker_config); + let broker = Broker::new(broker_config).await?; + let stream = RollupsClaimsStream::new(&dapp_metadata); + let last_claim_id = INITIAL_ID.to_string(); + Ok(Self { + broker, + stream, + last_claim_id, + }) } } #[async_trait] impl BrokerListener for DefaultBrokerListener { - type Error = DefaultBrokerListenerError; + type Error = BrokerListenerError; - async fn listen(&self) -> Result { - todo!() + async fn listen(&mut self) -> Result { + tracing::trace!("Waiting for claim with id {}", self.last_claim_id); + let event = self + .broker + .consume_blocking(&self.stream, &self.last_claim_id) + .await + .context(BrokerSnafu)?; + + Ok(event.payload) } } -// impl Stream for BrokerListener { -// type Item = u32; -// -// fn poll_next( -// self: Pin<&mut Self>, -// cx: &mut Context<'_>, -// ) -> Poll> { -// todo!() -// } -// } +#[cfg(test)] +mod tests { + use std::time::Duration; + use testcontainers::clients::Cli; + + use test_fixtures::BrokerFixture; + + use crate::{ + broker_mock, + listener::{BrokerListener, DefaultBrokerListener}, + }; + + async fn setup(docker: &Cli) -> (BrokerFixture, DefaultBrokerListener) { + broker_mock::setup_broker(docker, false).await.unwrap() + } + + #[tokio::test] + async fn instantiate_new_broker_listener_ok() { + let docker = Cli::default(); + let _ = setup(&docker).await; + } + + #[tokio::test] + async fn instantiate_new_broker_listener_error() { + let docker = Cli::default(); + let result = broker_mock::setup_broker(&docker, true).await; + assert!(result.is_err(), "setup_broker didn't fail as it should"); + let error = result.err().unwrap().to_string(); + assert_eq!(error, "error connecting to Redis"); + } + + #[tokio::test] + async fn start_broker_listener_with_one_claim_enqueued() { + let docker = Cli::default(); + let (fixture, mut broker_listener) = setup(&docker).await; + let n = 5; + broker_mock::produce_rollups_claims(&fixture, n, 0).await; + broker_mock::produce_last_claim(&fixture, n).await; + let result = broker_listener.listen().await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn start_broker_listener_with_claims_enqueued() { + let docker = Cli::default(); + let (fixture, mut broker_listener) = setup(&docker).await; + broker_mock::produce_last_claim(&fixture, 0).await; + let claim = broker_listener.listen().await; + assert!(claim.is_ok()); + } + + #[tokio::test] + async fn start_broker_listener_listener_with_no_claims_enqueued() { + let docker = Cli::default(); + let (fixture, mut broker_listener) = setup(&docker).await; + let n = 7; + + let broker_listener_thread = tokio::spawn(async move { + println!("Spawned the broker-listener thread."); + let claim = broker_listener.listen().await; + assert!(claim.is_ok()); + }); + + println!("Going to sleep for 1 second."); + tokio::time::sleep(Duration::from_secs(1)).await; + + let x = 2; + println!("Creating {} claims.", x); + broker_mock::produce_rollups_claims(&fixture, x, 0).await; + + println!("Going to sleep for 2 seconds."); + tokio::time::sleep(Duration::from_secs(2)).await; + + let y = 5; + println!("Creating {} claims.", y); + broker_mock::produce_rollups_claims(&fixture, y, x).await; + + assert_eq!(x + y, n); + broker_mock::produce_last_claim(&fixture, n).await; + + broker_listener_thread.await.unwrap(); + } +} diff --git a/offchain/authority-claimer/src/sender.rs b/offchain/authority-claimer/src/sender.rs index e3816eaf9..5a16bc9db 100644 --- a/offchain/authority-claimer/src/sender.rs +++ b/offchain/authority-claimer/src/sender.rs @@ -13,12 +13,12 @@ use crate::metrics::AuthorityClaimerMetrics; /// It should wait for N blockchain confirmations. #[async_trait] pub trait TransactionSender: Sized + Debug { - type Error: snafu::Error; + type Error: snafu::Error + 'static; - /// The `send_rollups_claim` function consumes the `TransactionSender` - /// object and then returns it to avoid that processes use the transaction - /// sender concurrently. - async fn send_rollups_claim( + /// The `send_rollups_claim_transaction` function consumes the + /// `TransactionSender` object and then returns it to avoid + /// that processes use the transaction sender concurrently. + async fn send_rollups_claim_transaction( self, rollups_claim: RollupsClaim, ) -> Result; @@ -32,7 +32,7 @@ pub trait TransactionSender: Sized + Debug { pub struct DefaultTransactionSender; #[derive(Debug, Snafu)] -pub enum DefaultTransactionSenderError { +pub enum TransactionSenderError { Todo, } @@ -40,19 +40,19 @@ impl DefaultTransactionSender { pub fn new( _dapp_metadata: DAppMetadata, _metrics: AuthorityClaimerMetrics, - ) -> Result { + ) -> Result { todo!() } } #[async_trait] impl TransactionSender for DefaultTransactionSender { - type Error = DefaultTransactionSenderError; + type Error = TransactionSenderError; - async fn send_rollups_claim( + async fn send_rollups_claim_transaction( self, _rollups_claim: RollupsClaim, ) -> Result { - todo!() + Err(TransactionSenderError::Todo) } } diff --git a/offchain/rollups-events/src/broker/mod.rs b/offchain/rollups-events/src/broker/mod.rs index 72b8e3fa4..02b681108 100644 --- a/offchain/rollups-events/src/broker/mod.rs +++ b/offchain/rollups-events/src/broker/mod.rs @@ -184,11 +184,8 @@ impl Broker { } } - /// Consume the next event in stream - /// This function blocks until a new event is available. - /// To consume the first event in the stream, last_consumed_id should be INITIAL_ID. #[tracing::instrument(level = "trace", skip_all)] - pub async fn consume_blocking( + async fn _consume_blocking( &mut self, stream: &S, last_consumed_id: &str, @@ -223,9 +220,32 @@ impl Broker { event.try_into() } + /// Consume the next event in stream + /// + /// This function blocks until a new event is available + /// and retries whenever a timeout happens instead of returning an error. + /// + /// To consume the first event in the stream, `last_consumed_id` should be `INITIAL_ID`. + #[tracing::instrument(level = "trace", skip_all)] + pub async fn consume_blocking( + &mut self, + stream: &S, + last_consumed_id: &str, + ) -> Result, BrokerError> { + loop { + let result = self._consume_blocking(stream, last_consumed_id).await; + + if let Err(BrokerError::ConsumeTimeout) = result { + tracing::trace!("consume timed out, retrying"); + } else { + return result; + } + } + } + /// Consume the next event in stream without blocking /// This function returns None if there are no more remaining events. - /// To consume the first event in the stream, last_consumed_id should be INITIAL_ID. + /// To consume the first event in the stream, `last_consumed_id` should be `INITIAL_ID`. #[tracing::instrument(level = "trace", skip_all)] pub async fn consume_nonblocking( &mut self, diff --git a/offchain/rollups-events/tests/integration.rs b/offchain/rollups-events/tests/integration.rs index 316ef188a..6bfaa9cc8 100644 --- a/offchain/rollups-events/tests/integration.rs +++ b/offchain/rollups-events/tests/integration.rs @@ -286,15 +286,3 @@ async fn test_it_does_not_block_when_consuming_empty_stream() { .expect("failed to peek"); assert!(matches!(event, None)); } - -#[test_log::test(tokio::test)] -async fn test_it_times_out_when_no_event_is_produced() { - let docker = Cli::default(); - let state = TestState::setup(&docker).await; - let mut broker = state.create_broker().await; - let err = broker - .consume_blocking(&MockStream {}, "0") - .await - .expect_err("consume event worked but it should have failed"); - assert!(matches!(err, BrokerError::ConsumeTimeout)); -}