diff --git a/CHANGELOG.md b/CHANGELOG.md index 24ab5cade..0e8db2b64 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,13 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## Unreleased + +### Added + +- Added support for the experimental multidapp claimer. +- Added the `DAPP_CONTRACT_ADDRESS` environment variable to the `authority-claimer`. If let unset, the service instantiates the MultidappClaimer, that reads dapp addresses from Redis. + ## [1.5.0] 2024-07-22 ### Added diff --git a/internal/node/services.go b/internal/node/services.go index 892c3db16..af1a8dfcf 100644 --- a/internal/node/services.go +++ b/internal/node/services.go @@ -117,6 +117,8 @@ func newAuthorityClaimer(c config.NodeConfig, workDir string) services.CommandSe s.Env = append(s.Env, fmt.Sprintf("HISTORY_ADDRESS=%v", c.ContractsHistoryAddress)) s.Env = append(s.Env, fmt.Sprintf("AUTHORITY_ADDRESS=%v", c.ContractsAuthorityAddress)) s.Env = append(s.Env, fmt.Sprintf("INPUT_BOX_ADDRESS=%v", c.ContractsInputBoxAddress)) + s.Env = append(s.Env, fmt.Sprintf("DAPP_CONTRACT_ADDRESS=%v", + c.ContractsApplicationAddress)) s.Env = append(s.Env, fmt.Sprintf("GENESIS_BLOCK=%v", c.ContractsInputBoxDeploymentBlockNumber)) s.Env = append(s.Env, fmt.Sprintf("AUTHORITY_CLAIMER_HTTP_SERVER_PORT=%v", diff --git a/offchain/Cargo.lock b/offchain/Cargo.lock index 65f3decd8..9f990e30f 100644 --- a/offchain/Cargo.lock +++ b/offchain/Cargo.lock @@ -4451,6 +4451,7 @@ dependencies = [ "prometheus-client", "redacted", "redis", + "regex", "serde", "serde_json", "snafu 0.8.2", diff --git a/offchain/advance-runner/src/broker.rs b/offchain/advance-runner/src/broker.rs index 1993b6e19..f550edc5d 100644 --- a/offchain/advance-runner/src/broker.rs +++ b/offchain/advance-runner/src/broker.rs @@ -55,7 +55,7 @@ impl BrokerFacade { let client = Broker::new(config).await.context(BrokerInternalSnafu)?; let inputs_stream = RollupsInputsStream::new(&dapp_metadata); let outputs_stream = RollupsOutputsStream::new(&dapp_metadata); - let claims_stream = RollupsClaimsStream::new(dapp_metadata.chain_id); + let claims_stream = RollupsClaimsStream::new(&dapp_metadata); Ok(Self { client, inputs_stream, diff --git a/offchain/authority-claimer/README.md b/offchain/authority-claimer/README.md index fc91cb73c..2e094aa17 100644 --- a/offchain/authority-claimer/README.md +++ b/offchain/authority-claimer/README.md @@ -2,3 +2,25 @@ This service submits rollups claims consumed from the broker to the blockchain using the [tx-manager crate](https://github.com/cartesi/tx-manager). It runs at the end of every epoch, when new claims are inserted on the broker. + +### Multidapp Mode + +(This is an **experimental** feature! Don't try it unless you know exactly what you are doing!) + +The `authority-claimer` can be configured to run in "multidapp mode". +To do so, the `DAPP_CONTRACT_ADDRESS` environment variable must be left unset. +This will force the claimer to instantiate a `MultidappBrokerListener` instead of a `DefaultBrokerListener`. + +In multidapp mode, the claimer reads claims from the broker for multiple dapps. +All dapps must share the same History contract and the same chain ID. + +Instead of using evironment variables, + the claimer will get the list of dapp addresses from Redis, + through the `experimental-dapp-addresses-config` key. +You must set this key with a string of comma separated (`", "`) + hex encoded addresses (without `"0x"`) + **before** starting the claimer. +You may rewrite this key at any time, and the claimer will adjust accordingly to the new list of addresses. +The claimer stops with an error if the list is empty. + +Example key value: `"0202020202020202020202020202020202020202, 0505050505050505050505050505050505050505"`. diff --git a/offchain/authority-claimer/src/claimer.rs b/offchain/authority-claimer/src/claimer.rs index 43c475d20..203603cef 100644 --- a/offchain/authority-claimer/src/claimer.rs +++ b/offchain/authority-claimer/src/claimer.rs @@ -4,7 +4,9 @@ use async_trait::async_trait; use snafu::ResultExt; use std::fmt::Debug; -use tracing::{info, trace}; +use tracing::{debug, info}; + +use rollups_events::Address; use crate::{ checker::DuplicateChecker, listener::BrokerListener, @@ -31,6 +33,9 @@ pub enum ClaimerError< D: DuplicateChecker, T: TransactionSender, > { + #[snafu(display("invalid app address {:?}", app_address))] + InvalidAppAddress { app_address: Address }, + #[snafu(display("broker listener error"))] BrokerListenerError { source: B::Error }, @@ -84,14 +89,14 @@ where type Error = ClaimerError; async fn start(mut self) -> Result<(), Self::Error> { - trace!("Starting the authority claimer loop"); + debug!("Starting the authority claimer loop"); loop { let rollups_claim = self .broker_listener .listen() .await .context(BrokerListenerSnafu)?; - trace!("Got a claim from the broker: {:?}", rollups_claim); + debug!("Got a claim from the broker: {:?}", rollups_claim); let is_duplicated_rollups_claim = self .duplicate_checker @@ -99,7 +104,7 @@ where .await .context(DuplicatedClaimSnafu)?; if is_duplicated_rollups_claim { - trace!("It was a duplicated claim"); + debug!("It was a duplicated claim"); continue; } diff --git a/offchain/authority-claimer/src/config/cli.rs b/offchain/authority-claimer/src/config/cli.rs index 5df9c6ffd..db1979aeb 100644 --- a/offchain/authority-claimer/src/config/cli.rs +++ b/offchain/authority-claimer/src/config/cli.rs @@ -6,9 +6,10 @@ use eth_tx_manager::{ config::{TxEnvCLIConfig as TxManagerCLIConfig, TxManagerConfig}, Priority, }; +use ethers::utils::hex; use log::{LogConfig, LogEnvCliConfig}; use redacted::Redacted; -use rollups_events::{BrokerCLIConfig, BrokerConfig}; +use rollups_events::{Address, BrokerCLIConfig, BrokerConfig}; use rusoto_core::Region; use snafu::ResultExt; use std::{fs, str::FromStr}; @@ -47,6 +48,10 @@ pub(crate) struct AuthorityClaimerCLI { #[command(flatten)] pub contracts_config: ContractsCLIConfig, + /// Address of rollups dapp + #[arg(long, env)] + pub dapp_contract_address: Option, + /// Genesis block for reading blockchain events #[arg(long, env, default_value_t = 1)] pub genesis_block: u64, @@ -72,6 +77,17 @@ impl TryFrom for AuthorityClaimerConfig { ContractsConfig::try_from(cli_config.contracts_config) .context(ContractsSnafu)?; + let dapp_contract_address: Option
= cli_config + .dapp_contract_address + .map(|raw_dapp_contract_address| { + let address: [u8; 20] = + hex::decode(&raw_dapp_contract_address[2..]) + .expect("Dapp json parse error") + .try_into() + .expect("Dapp address with wrong size"); + address.into() + }); + Ok(AuthorityClaimerConfig { tx_manager_config, tx_signing_config, @@ -79,6 +95,7 @@ impl TryFrom for AuthorityClaimerConfig { broker_config, log_config, contracts_config, + dapp_address: dapp_contract_address, genesis_block: cli_config.genesis_block, }) } diff --git a/offchain/authority-claimer/src/config/mod.rs b/offchain/authority-claimer/src/config/mod.rs index 21d1932b2..175163188 100644 --- a/offchain/authority-claimer/src/config/mod.rs +++ b/offchain/authority-claimer/src/config/mod.rs @@ -13,7 +13,7 @@ use eth_tx_manager::{config::TxManagerConfig, Priority}; use http_server::HttpServerConfig; use log::LogConfig; use redacted::Redacted; -use rollups_events::BrokerConfig; +use rollups_events::{Address, BrokerConfig}; use rusoto_core::Region; #[derive(Debug, Clone)] @@ -30,6 +30,7 @@ pub struct AuthorityClaimerConfig { pub broker_config: BrokerConfig, pub log_config: LogConfig, pub contracts_config: ContractsConfig, + pub dapp_address: Option
, pub genesis_block: u64, } diff --git a/offchain/authority-claimer/src/lib.rs b/offchain/authority-claimer/src/lib.rs index 944efb4b4..2c8a7e45d 100644 --- a/offchain/authority-claimer/src/lib.rs +++ b/offchain/authority-claimer/src/lib.rs @@ -10,6 +10,7 @@ pub mod sender; pub mod signer; use config::Config; +use listener::{BrokerListener, MultidappBrokerListener}; use snafu::Error; use tracing::trace; @@ -22,21 +23,39 @@ use crate::{ }; pub async fn run(config: Config) -> Result<(), Box> { - // Creating the metrics and health server. let metrics = AuthorityClaimerMetrics::new(); + let dapp_address = config.authority_claimer_config.dapp_address.clone(); + if let Some(dapp_address) = dapp_address { + trace!("Creating the default broker listener"); + let broker_listener = DefaultBrokerListener::new( + config.authority_claimer_config.broker_config.clone(), + config.authority_claimer_config.tx_manager_config.chain_id, + dapp_address, + ) + .await?; + _run(metrics, config, broker_listener).await + } else { + trace!("Creating the multidapp broker listener"); + let broker_listener = MultidappBrokerListener::new( + config.authority_claimer_config.broker_config.clone(), + config.authority_claimer_config.tx_manager_config.chain_id, + ) + .await?; + _run(metrics, config, broker_listener).await + } +} + +async fn _run( + metrics: AuthorityClaimerMetrics, + config: Config, + broker_listener: B, +) -> Result<(), Box> { let http_server_handle = http_server::start(config.http_server_config, metrics.clone().into()); - let config = config.authority_claimer_config; - let chain_id = config.tx_manager_config.chain_id; - // Creating the broker listener. - trace!("Creating the broker listener"); - let broker_listener = - DefaultBrokerListener::new(config.broker_config.clone(), chain_id) - .await?; + let chain_id = config.tx_manager_config.chain_id; - // Creating the duplicate checker. trace!("Creating the duplicate checker"); let duplicate_checker = DefaultDuplicateChecker::new( config.tx_manager_config.provider_http_endpoint.clone(), @@ -46,18 +65,18 @@ pub async fn run(config: Config) -> Result<(), Box> { ) .await?; - // Creating the transaction sender. trace!("Creating the transaction sender"); let transaction_sender = DefaultTransactionSender::new(config.clone(), chain_id, metrics) .await?; - // Creating the claimer loop. + // Creating the claimer. let claimer = DefaultClaimer::new( broker_listener, duplicate_checker, transaction_sender, ); + let claimer_handle = claimer.start(); // Starting the HTTP server and the claimer loop. diff --git a/offchain/authority-claimer/src/listener.rs b/offchain/authority-claimer/src/listener.rs index 3f50a1cb1..57124ceb7 100644 --- a/offchain/authority-claimer/src/listener.rs +++ b/offchain/authority-claimer/src/listener.rs @@ -3,25 +3,34 @@ use async_trait::async_trait; use rollups_events::{ - Broker, BrokerConfig, BrokerError, RollupsClaim, RollupsClaimsStream, - INITIAL_ID, + Address, Broker, BrokerConfig, BrokerError, DAppMetadata, RollupsClaim, + RollupsClaimsStream, INITIAL_ID, }; use snafu::ResultExt; -use std::fmt::Debug; +use std::{collections::HashMap, fmt::Debug}; -/// The `BrokerListener` listens for new claims from the broker +/// The `BrokerListener` listens for new claims from the broker. #[async_trait] pub trait BrokerListener: Debug { type Error: snafu::Error + 'static; - /// Listen to claims async fn listen(&mut self) -> Result; } +#[derive(Debug, snafu::Snafu)] +pub enum BrokerListenerError { + #[snafu(display("broker error"))] + BrokerError { source: BrokerError }, + + #[snafu(display("no dapps"))] + NoDapps, +} + // ------------------------------------------------------------------------------------------------ // DefaultBrokerListener // ------------------------------------------------------------------------------------------------ +/// The `DefaultBrokerListener` only listens for claims from one dapp. #[derive(Debug)] pub struct DefaultBrokerListener { broker: Broker, @@ -29,20 +38,19 @@ pub struct DefaultBrokerListener { last_claim_id: String, } -#[derive(Debug, snafu::Snafu)] -pub enum BrokerListenerError { - #[snafu(display("broker error"))] - BrokerError { source: BrokerError }, -} - impl DefaultBrokerListener { pub async fn new( broker_config: BrokerConfig, chain_id: u64, + dapp_address: Address, ) -> Result { tracing::trace!("Connecting to the broker ({:?})", broker_config); let broker = Broker::new(broker_config).await?; - let stream = RollupsClaimsStream::new(chain_id); + let dapp_metadata = DAppMetadata { + chain_id, + dapp_address, + }; + let stream = RollupsClaimsStream::new(&dapp_metadata); let last_claim_id = INITIAL_ID.to_string(); Ok(Self { broker, @@ -70,31 +78,134 @@ impl BrokerListener for DefaultBrokerListener { } } +// ------------------------------------------------------------------------------------------------ +// MultidappBrokerListener +// ------------------------------------------------------------------------------------------------ + +/// The `MultidappBrokerListener` listens for claims from multiple dapps. +/// It updates its internal list of dapps by consuming from redis' DappsStream. +#[derive(Debug)] +pub struct MultidappBrokerListener { + broker: Broker, + streams: HashMap, // stream => last-claim-id + chain_id: u64, +} + +impl MultidappBrokerListener { + pub async fn new( + broker_config: BrokerConfig, + chain_id: u64, + ) -> Result { + tracing::trace!("Connecting to the broker ({:?})", broker_config); + let broker = Broker::new(broker_config).await?; + let streams = HashMap::new(); + Ok(Self { + broker, + streams, + chain_id, + }) + } +} + +impl MultidappBrokerListener { + /// Reads addresses from the DappStream and + /// converts them to the stream to last-consumed-id map. + async fn update_streams(&mut self) -> Result<(), BrokerListenerError> { + let initial_id = INITIAL_ID.to_string(); + + let streams: Vec<_> = self + .broker + .get_dapps() + .await + .context(BrokerSnafu)? + .into_iter() + .map(|dapp_address| { + let dapp_metadata = &DAppMetadata { + chain_id: self.chain_id, + dapp_address, + }; + let stream = RollupsClaimsStream::new(dapp_metadata); + let id = self.streams.get(&stream).unwrap_or(&initial_id); + (stream, id.clone()) + }) + .collect(); + if streams.is_empty() { + return Err(BrokerListenerError::NoDapps); + } + + self.streams = HashMap::from_iter(streams); + Ok(()) + } +} + +#[async_trait] +impl BrokerListener for MultidappBrokerListener { + type Error = BrokerListenerError; + + async fn listen(&mut self) -> Result { + tracing::trace!("Waiting for claim"); + + self.update_streams().await?; + + let (stream, event) = self + .broker + .consume_blocking_from_multiple_streams(self.streams.clone()) + .await + .context(BrokerSnafu)?; + + // Updates the last-consumed-id from the stream. + let replaced = self.streams.insert(stream.clone(), event.id); + assert!(replaced.is_some()); + + Ok(event.payload) + } +} + +// ------------------------------------------------------------------------------------------------ +// Tests +// ------------------------------------------------------------------------------------------------ + #[cfg(test)] mod tests { use std::time::Duration; use testcontainers::clients::Cli; - use test_fixtures::BrokerFixture; - - use crate::listener::{BrokerListener, DefaultBrokerListener}; + use test_fixtures::{broker::ClaimerMultidappBrokerFixture, BrokerFixture}; use backoff::ExponentialBackoffBuilder; use rollups_events::{ - BrokerConfig, BrokerEndpoint, BrokerError, RedactedUrl, RollupsClaim, - Url, + Address, BrokerConfig, BrokerEndpoint, BrokerError, RedactedUrl, + RollupsClaim, Url, }; - // ------------------------------------------------------------------------------------------------ + use crate::listener::{BrokerListener, BrokerListenerError}; + + use super::{DefaultBrokerListener, MultidappBrokerListener}; + + // -------------------------------------------------------------------------------------------- // Broker Mock - // ------------------------------------------------------------------------------------------------ + // -------------------------------------------------------------------------------------------- - pub async fn setup_broker( + fn config(redis_endpoint: BrokerEndpoint) -> BrokerConfig { + BrokerConfig { + redis_endpoint, + consume_timeout: 300000, + backoff: ExponentialBackoffBuilder::new() + .with_initial_interval(Duration::from_millis(1000)) + .with_max_elapsed_time(Some(Duration::from_millis(3000))) + .build(), + } + } + + // -------------------------------------------------------------------------------------------- + // DefaultListener Tests + // -------------------------------------------------------------------------------------------- + + async fn setup_default_broker_listener( docker: &Cli, should_fail: bool, ) -> Result<(BrokerFixture, DefaultBrokerListener), BrokerError> { let fixture = BrokerFixture::setup(docker).await; - let redis_endpoint = if should_fail { BrokerEndpoint::Single(RedactedUrl::new( Url::parse("https://invalid.com").unwrap(), @@ -102,21 +213,16 @@ mod tests { } else { fixture.redis_endpoint().clone() }; - - let config = BrokerConfig { - redis_endpoint, - consume_timeout: 300000, - backoff: ExponentialBackoffBuilder::new() - .with_initial_interval(Duration::from_millis(1000)) - .with_max_elapsed_time(Some(Duration::from_millis(3000))) - .build(), - }; - let broker = - DefaultBrokerListener::new(config, fixture.chain_id()).await?; + let broker = DefaultBrokerListener::new( + config(redis_endpoint), + fixture.chain_id(), + fixture.dapp_address().clone(), + ) + .await?; Ok((fixture, broker)) } - pub async fn produce_rollups_claims( + async fn default_produce_claims( fixture: &BrokerFixture<'_>, n: usize, epoch_index_start: usize, @@ -132,59 +238,57 @@ mod tests { } /// The last claim should trigger an `EndError` error. - pub async fn produce_last_claim( + async fn default_produce_last_claim( fixture: &BrokerFixture<'_>, epoch_index: usize, ) -> Vec { - produce_rollups_claims(fixture, 1, epoch_index).await + default_produce_claims(fixture, 1, epoch_index).await } - // ------------------------------------------------------------------------------------------------ - // Listener Unit Tests - // ------------------------------------------------------------------------------------------------ + // -------------------------------------------------------------------------------------------- #[tokio::test] - async fn instantiate_new_broker_listener_ok() { + async fn instantiate_new_default_broker_listener_ok() { let docker = Cli::default(); - let _ = setup_broker(&docker, false).await; + let _ = setup_default_broker_listener(&docker, false).await; } #[tokio::test] - async fn instantiate_new_broker_listener_error() { + async fn instantiate_new_default_broker_listener_error() { let docker = Cli::default(); - let result = setup_broker(&docker, true).await; - assert!(result.is_err(), "setup_broker didn't fail as it should"); + let result = setup_default_broker_listener(&docker, true).await; + assert!(result.is_err(), "setup didn't fail as it should"); let error = result.err().unwrap().to_string(); assert_eq!(error, "error connecting to Redis"); } #[tokio::test] - async fn start_broker_listener_with_one_claim_enqueued() { + async fn start_default_broker_listener_with_one_claim_enqueued() { let docker = Cli::default(); let (fixture, mut broker_listener) = - setup_broker(&docker, false).await.unwrap(); + setup_default_broker_listener(&docker, false).await.unwrap(); let n = 5; - produce_rollups_claims(&fixture, n, 0).await; - produce_last_claim(&fixture, n).await; + default_produce_claims(&fixture, n, 0).await; + default_produce_last_claim(&fixture, n).await; let result = broker_listener.listen().await; assert!(result.is_ok()); } #[tokio::test] - async fn start_broker_listener_with_claims_enqueued() { + async fn start_default_broker_listener_with_claims_enqueued() { let docker = Cli::default(); let (fixture, mut broker_listener) = - setup_broker(&docker, false).await.unwrap(); - produce_last_claim(&fixture, 0).await; + setup_default_broker_listener(&docker, false).await.unwrap(); + default_produce_last_claim(&fixture, 0).await; let claim = broker_listener.listen().await; assert!(claim.is_ok()); } #[tokio::test] - async fn start_broker_listener_listener_with_no_claims_enqueued() { + async fn start_default_broker_listener_listener_with_no_claims_enqueued() { let docker = Cli::default(); let (fixture, mut broker_listener) = - setup_broker(&docker, false).await.unwrap(); + setup_default_broker_listener(&docker, false).await.unwrap(); let n = 7; let broker_listener_thread = tokio::spawn(async move { @@ -198,17 +302,240 @@ mod tests { let x = 2; println!("Creating {} claims.", x); - produce_rollups_claims(&fixture, x, 0).await; + default_produce_claims(&fixture, x, 0).await; println!("Going to sleep for 2 seconds."); tokio::time::sleep(Duration::from_secs(2)).await; let y = 5; println!("Creating {} claims.", y); - produce_rollups_claims(&fixture, y, x).await; + default_produce_claims(&fixture, y, x).await; assert_eq!(x + y, n); - produce_last_claim(&fixture, n).await; + default_produce_last_claim(&fixture, n).await; + + broker_listener_thread.await.unwrap(); + } + + // -------------------------------------------------------------------------------------------- + // MultidappListener Tests + // -------------------------------------------------------------------------------------------- + + async fn setup_multidapp_listener( + docker: &Cli, + should_fail: bool, + ) -> Result< + ( + ClaimerMultidappBrokerFixture, + MultidappBrokerListener, + Vec
, + ), + BrokerError, + > { + let chain_id: u64 = 0; + let dapp_addresses: Vec
= vec![ + [3; 20].into(), // + [5; 20].into(), // + [7; 20].into(), // + ]; + let dapps: Vec<_> = dapp_addresses + .clone() + .into_iter() + .map(|dapp_address| (chain_id, dapp_address)) + .collect(); + + let fixture = + ClaimerMultidappBrokerFixture::setup(docker, dapps.clone()).await; + fixture.set_dapps(dapp_addresses.clone()).await; + + let redis_endpoint = if should_fail { + BrokerEndpoint::Single(RedactedUrl::new( + Url::parse("https://invalid.com").unwrap(), + )) + } else { + fixture.redis_endpoint().clone() + }; + + let listener = + MultidappBrokerListener::new(config(redis_endpoint), chain_id) + .await?; + Ok((fixture, listener, dapp_addresses)) + } + + // For each index in indexes, this function produces a claim + // with rollups_claim.dapp_address = dapps[index] + // and rollups_claim.epoch_index = epochs[index]. + // It then increments epochs[index]. + async fn multidapp_produce_claims( + fixture: &ClaimerMultidappBrokerFixture<'_>, + epochs: &mut Vec, + dapps: &Vec
, + indexes: &Vec, + ) { + for &index in indexes { + let epoch = *epochs.get(index).unwrap(); + + let mut rollups_claim = RollupsClaim::default(); + rollups_claim.dapp_address = dapps.get(index).unwrap().clone(); + rollups_claim.epoch_index = epoch; + fixture.produce_rollups_claim(rollups_claim.clone()).await; + + epochs[index] = epoch + 1; + } + } + + // Asserts that listener.listen() will return indexes.len() claims, + // and that for each index in indexes + // there is an unique claim for which claim.dapp_address = dapps[index]. + async fn assert_listen( + listener: &mut MultidappBrokerListener, + dapps: &Vec
, + indexes: &Vec, + ) { + let mut dapps: Vec<_> = indexes + .iter() + .map(|&index| dapps.get(index).unwrap().clone()) + .collect(); + for _ in indexes.clone() { + println!("Listening..."); + let result = listener.listen().await; + assert!(result.is_ok(), "{:?}", result.unwrap_err()); + let dapp = result.unwrap().dapp_address; + + let index = dapps.iter().position(|expected| *expected == dapp); + assert!(index.is_some()); + dapps.remove(index.unwrap()); + } + assert!(dapps.is_empty()); + } + + // -------------------------------------------------------------------------------------------- + + #[tokio::test] + async fn instantiate_multidapp_broker_listener_ok() { + let docker = Cli::default(); + let _ = setup_multidapp_listener(&docker, false).await; + } + + #[tokio::test] + async fn instantiate_multidapp_broker_listener_error() { + let docker = Cli::default(); + let result = setup_multidapp_listener(&docker, true).await; + assert!(result.is_err(), "setup didn't fail as it should"); + let error = result.err().unwrap().to_string(); + assert_eq!(error, "error connecting to Redis"); + } + + #[tokio::test] + async fn multidapp_listen_with_no_dapps() { + let docker = Cli::default(); + let (fixture, mut listener, dapps) = + setup_multidapp_listener(&docker, false).await.unwrap(); + fixture.set_dapps(vec![]).await; + let mut epochs = vec![0; dapps.len()]; + let indexes = vec![0, 1, 2]; + multidapp_produce_claims(&fixture, &mut epochs, &dapps, &indexes).await; + let result = listener.listen().await; + assert!(result.is_err()); + assert_eq!( + BrokerListenerError::NoDapps.to_string(), + result.unwrap_err().to_string() + ); + } + + #[tokio::test] + async fn multidapp_listen_with_one_dapp() { + let docker = Cli::default(); + let (fixture, mut listener, dapps) = + setup_multidapp_listener(&docker, false).await.unwrap(); + fixture.set_dapps(vec![dapps.get(0).unwrap().clone()]).await; + let mut epochs = vec![0; dapps.len()]; + let indexes = vec![2, 1, 1, 2, 0]; + multidapp_produce_claims(&fixture, &mut epochs, &dapps, &indexes).await; + assert_listen(&mut listener, &dapps, &vec![0]).await; + } + + #[tokio::test] + async fn multidapp_listen_with_one_claim_enqueued() { + let docker = Cli::default(); + let (fixture, mut listener, dapps) = + setup_multidapp_listener(&docker, false).await.unwrap(); + + let mut epochs = vec![0; dapps.len()]; + let index = 0; + multidapp_produce_claims(&fixture, &mut epochs, &dapps, &vec![index]) + .await; + + let result = listener.listen().await; + assert!(result.is_ok()); + + let expected_dapp = dapps.get(index).unwrap().clone(); + let actual_dapp = result.unwrap().dapp_address; + assert_eq!(expected_dapp, actual_dapp); + } + + #[tokio::test] + async fn multidapp_listen_with_multiple_claims_enqueued() { + let docker = Cli::default(); + let (fixture, mut listener, dapps) = + setup_multidapp_listener(&docker, false).await.unwrap(); + + let mut epochs = vec![0; dapps.len()]; + let indexes = vec![2, 1, 1, 2]; + multidapp_produce_claims(&fixture, &mut epochs, &dapps, &indexes).await; + assert_listen(&mut listener, &dapps, &indexes).await; + } + + #[tokio::test] + async fn multidapp_listen_with_one_claim_for_each_dapp_enqueued() { + let docker = Cli::default(); + let (fixture, mut listener, dapps) = + setup_multidapp_listener(&docker, false).await.unwrap(); + + let mut epochs = vec![0; dapps.len()]; + let indexes = vec![2, 1, 0]; + multidapp_produce_claims(&fixture, &mut epochs, &dapps, &indexes).await; + assert_listen(&mut listener, &dapps, &indexes).await; + } + + #[tokio::test] + async fn multidapp_listen_with_no_claims_enqueued() { + let docker = Cli::default(); + let (fixture, mut listener, dapps) = + setup_multidapp_listener(&docker, false).await.unwrap(); + + let mut epochs = vec![0; dapps.len()]; + let first_batch = vec![0, 1, 2, 0]; + let second_batch = vec![2, 1, 0, 0, 2, 1]; + + let broker_listener_thread = { + let _dapps = dapps.clone(); + let _first_batch = first_batch.clone(); + let _second_batch = second_batch.clone(); + tokio::spawn(async move { + println!("Spawned the broker-listener thread."); + assert_listen(&mut listener, &_dapps, &_first_batch).await; + println!("All good with the first batch!"); + assert_listen(&mut listener, &_dapps, &_second_batch).await; + println!("All good with the second batch!"); + }) + }; + + println!("Going to sleep for 1 second."); + tokio::time::sleep(Duration::from_secs(1)).await; + + println!("Producing the first batch of claims."); + multidapp_produce_claims(&fixture, &mut epochs, &dapps, &first_batch) + .await; + println!("Epochs: {:?}", epochs); + + println!("Going to sleep for 2 seconds."); + tokio::time::sleep(Duration::from_secs(2)).await; + + println!("Producing the second batch of claims."); + multidapp_produce_claims(&fixture, &mut epochs, &dapps, &second_batch) + .await; + println!("Epochs: {:?}", epochs); broker_listener_thread.await.unwrap(); } diff --git a/offchain/rollups-events/Cargo.toml b/offchain/rollups-events/Cargo.toml index 7eda7413b..d784f8024 100644 --- a/offchain/rollups-events/Cargo.toml +++ b/offchain/rollups-events/Cargo.toml @@ -12,6 +12,7 @@ base64.workspace = true clap = { workspace = true, features = ["derive", "env"] } hex.workspace = true prometheus-client.workspace = true +regex.workspace = true serde = { workspace = true, features = ["derive"] } serde_json.workspace = true snafu.workspace = true diff --git a/offchain/rollups-events/src/broker/mod.rs b/offchain/rollups-events/src/broker/mod.rs index fa622a3d1..74e33587d 100644 --- a/offchain/rollups-events/src/broker/mod.rs +++ b/offchain/rollups-events/src/broker/mod.rs @@ -14,14 +14,21 @@ use redis::{ }; use serde::{de::DeserializeOwned, Serialize}; use snafu::{ResultExt, Snafu}; +use std::collections::HashMap; +use std::convert::identity; use std::fmt; +use std::str::FromStr; use std::time::Duration; pub use redacted::{RedactedUrl, Url}; +use crate::Address; + pub mod indexer; pub const INITIAL_ID: &str = "0"; +const DAPPS_KEY: &str = "experimental-dapp-addresses-config"; +const DAPPS_DIVIDER: &str = ", "; /// The `BrokerConnection` enum implements the `ConnectionLike` trait /// to satisfy the `AsyncCommands` trait bounds. @@ -280,6 +287,120 @@ impl Broker { Ok(None) } } + + #[tracing::instrument(level = "trace", skip_all)] + async fn _consume_blocking_from_multiple_streams( + &mut self, + streams: &Vec, + last_consumed_ids: &Vec, + ) -> Result<(S, Event), BrokerError> { + let reply = retry(self.backoff.clone(), || async { + let stream_keys: Vec = streams + .iter() + .map(|stream| stream.key().to_string()) + .collect(); + + let opts = StreamReadOptions::default() + .count(1) + .block(self.consume_timeout); + let reply: StreamReadReply = self + .connection + .clone() + .xread_options(&stream_keys, &last_consumed_ids, &opts) + .await?; + + Ok(reply) + }) + .await + .context(ConnectionSnafu)?; + + tracing::trace!("checking for timeout"); + if reply.keys.is_empty() { + return Err(BrokerError::ConsumeTimeout); + } + + tracing::trace!("checking if any events were received"); + for mut stream_key in reply.keys { + if let Some(event) = stream_key.ids.pop() { + tracing::trace!("parsing received event"); + let stream = S::from_key(stream_key.key); + let event = event.try_into()?; + return Ok((stream, event)); + } + } + return Err(BrokerError::FailedToConsume); + } + + /// Consume the next event from one of the streams. + /// + /// This function blocks until a new event is available in one of the streams, + /// and retries whenever a timeout happens instead of returning an error. + /// + /// To consume the first event for a stream, `last_consumed_id[...]` should be `INITIAL_ID`. + #[tracing::instrument(level = "trace", skip_all)] + pub async fn consume_blocking_from_multiple_streams< + S: BrokerMultiStream, + >( + &mut self, + streams: HashMap, // streams to last-consumed-ids + ) -> Result<(S, Event), BrokerError> { + let (streams, last_consumed_ids): (Vec<_>, Vec<_>) = + streams.into_iter().map(identity).unzip(); + + loop { + let result = self + ._consume_blocking_from_multiple_streams( + &streams, + &last_consumed_ids, + ) + .await; + + if let Err(BrokerError::ConsumeTimeout) = result { + tracing::trace!("consume timed out, retrying"); + } else { + return result; + } + } + } + + /// Gets the dapp addresses. + #[tracing::instrument(level = "trace", skip_all)] + pub async fn get_dapps(&mut self) -> Result, BrokerError> { + retry(self.backoff.clone(), || async { + tracing::trace!(key = DAPPS_KEY, "getting key"); + let reply: String = self.connection.clone().get(DAPPS_KEY).await?; + if reply.is_empty() { + return Ok(vec![]); + } + Ok(reply + .split(DAPPS_DIVIDER) + .map(|s| Address::from_str(s).unwrap()) + .collect::>()) + }) + .await + .context(ConnectionSnafu) + } + + /// Sets the dapp addresses. + #[tracing::instrument(level = "trace", skip_all)] + pub async fn set_dapps( + &mut self, + dapp_addresses: Vec
, + ) -> Result<(), BrokerError> { + tracing::trace!(key = DAPPS_KEY, "setting key"); + let dapp_addresses: Vec<_> = dapp_addresses + .iter() + .map(|address| address.to_string()) + .collect(); + let dapp_addresses = dapp_addresses.join(DAPPS_DIVIDER); + let _: () = self + .connection + .clone() + .set(DAPPS_KEY, dapp_addresses) + .await + .unwrap(); + Ok(()) + } } /// Custom implementation of Debug because ConnectionManager doesn't implement debug @@ -297,6 +418,10 @@ pub trait BrokerStream { fn key(&self) -> &str; } +pub trait BrokerMultiStream: BrokerStream { + fn from_key(key: String) -> Self; +} + /// Event that goes through the broker #[derive(Debug, Clone, Eq, PartialEq)] pub struct Event { diff --git a/offchain/rollups-events/src/common.rs b/offchain/rollups-events/src/common.rs index 1665896f7..c3e7a35d0 100644 --- a/offchain/rollups-events/src/common.rs +++ b/offchain/rollups-events/src/common.rs @@ -6,6 +6,7 @@ use prometheus_client::encoding::EncodeLabelValue; use prometheus_client::encoding::LabelValueEncoder; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use std::fmt::Write; +use std::str::FromStr; pub const ADDRESS_SIZE: usize = 20; pub const HASH_SIZE: usize = 32; @@ -40,6 +41,14 @@ impl From<[u8; N]> for HexArray { } } +impl FromStr for HexArray { + type Err = serde_json::Error; + + fn from_str(s: &str) -> Result { + serde_json::from_value(serde_json::Value::String(s.to_string())) + } +} + impl Serialize for HexArray { fn serialize(&self, serializer: S) -> Result where @@ -69,6 +78,16 @@ impl<'de, const N: usize> Deserialize<'de> for HexArray { } } +impl ToString for HexArray { + fn to_string(&self) -> String { + let s = serde_json::to_string(self).unwrap(); + let mut chars = s.chars(); + chars.next(); + chars.next_back(); + chars.as_str().to_string() + } +} + impl std::fmt::Debug for HexArray { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", hex::encode(self.inner())) diff --git a/offchain/rollups-events/src/lib.rs b/offchain/rollups-events/src/lib.rs index 8c75b8ff8..8565637bf 100644 --- a/offchain/rollups-events/src/lib.rs +++ b/offchain/rollups-events/src/lib.rs @@ -10,7 +10,8 @@ mod rollups_stream; pub use broker::{ indexer, Broker, BrokerCLIConfig, BrokerConfig, BrokerEndpoint, - BrokerError, BrokerStream, Event, RedactedUrl, Url, INITIAL_ID, + BrokerError, BrokerMultiStream, BrokerStream, Event, RedactedUrl, Url, + INITIAL_ID, }; pub use common::{Address, Hash, Payload, ADDRESS_SIZE, HASH_SIZE}; pub use rollups_claims::{RollupsClaim, RollupsClaimsStream}; @@ -23,4 +24,6 @@ pub use rollups_outputs::{ RollupsOutput, RollupsOutputEnum, RollupsOutputValidityProof, RollupsOutputsStream, RollupsProof, RollupsReport, RollupsVoucher, }; -pub use rollups_stream::{DAppMetadata, DAppMetadataCLIConfig}; +pub use rollups_stream::{ + parse_stream_with_key, DAppMetadata, DAppMetadataCLIConfig, +}; diff --git a/offchain/rollups-events/src/rollups_claims.rs b/offchain/rollups-events/src/rollups_claims.rs index 69cb2a486..a08b10cac 100644 --- a/offchain/rollups-events/src/rollups_claims.rs +++ b/offchain/rollups-events/src/rollups_claims.rs @@ -3,28 +3,9 @@ use serde::{Deserialize, Serialize}; -use crate::{Address, BrokerStream, Hash}; +use crate::{rollups_stream::decl_broker_stream, Address, Hash}; -#[derive(Debug)] -pub struct RollupsClaimsStream { - key: String, -} - -impl BrokerStream for RollupsClaimsStream { - type Payload = RollupsClaim; - - fn key(&self) -> &str { - &self.key - } -} - -impl RollupsClaimsStream { - pub fn new(chain_id: u64) -> Self { - Self { - key: format!("{{chain-{}}}:rollups-claims", chain_id), - } - } -} +decl_broker_stream!(RollupsClaimsStream, RollupsClaim, "rollups-claim"); /// Event generated when the Cartesi Rollups epoch finishes #[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize)] diff --git a/offchain/rollups-events/src/rollups_outputs.rs b/offchain/rollups-events/src/rollups_outputs.rs index 57c8bc0c3..a485391e7 100644 --- a/offchain/rollups-events/src/rollups_outputs.rs +++ b/offchain/rollups-events/src/rollups_outputs.rs @@ -9,7 +9,7 @@ use crate::{rollups_stream::decl_broker_stream, Address, Hash, Payload}; decl_broker_stream!(RollupsOutputsStream, RollupsOutput, "rollups-outputs"); -/// Cartesi output +/// Cartesi output #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] pub enum RollupsOutput { AdvanceResult(RollupsAdvanceResult), diff --git a/offchain/rollups-events/src/rollups_stream.rs b/offchain/rollups-events/src/rollups_stream.rs index 7dd677fd3..7d9eb9d97 100644 --- a/offchain/rollups-events/src/rollups_stream.rs +++ b/offchain/rollups-events/src/rollups_stream.rs @@ -63,15 +63,38 @@ impl From for DAppMetadata { } } +pub fn parse_stream_with_key(key: String, inner_key: &str) -> (u64, Address) { + let mut re = r"^\{chain-([^:]+):dapp-([^}]+)\}:".to_string(); + re.push_str(inner_key); + re.push_str("$"); + let re = regex::Regex::new(&re).unwrap(); + let caps = re.captures(&key).unwrap(); + + let chain_id = caps + .get(1) + .unwrap() + .as_str() + .to_string() + .parse::() + .unwrap(); + let address = caps.get(2).unwrap().as_str().to_string(); + let address = + serde_json::from_value(serde_json::Value::String(address)).unwrap(); + + return (chain_id, address); +} + /// Declares a struct that implements the BrokerStream interface /// The generated key has the format `{chain-:dapp-}:`. /// The curly braces define a hash tag to ensure that all of a dapp's streams /// are located in the same node when connected to a Redis cluster. macro_rules! decl_broker_stream { ($stream: ident, $payload: ty, $key: literal) => { - #[derive(Debug)] + #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct $stream { key: String, + pub chain_id: u64, + pub dapp_address: Address, } impl crate::broker::BrokerStream for $stream { @@ -82,15 +105,31 @@ macro_rules! decl_broker_stream { } } + impl crate::broker::BrokerMultiStream for $stream { + fn from_key(key: String) -> Self { + let (chain_id, dapp_address) = + crate::parse_stream_with_key(key.clone(), $key); + Self { + key: key, + chain_id: chain_id, + dapp_address: dapp_address, + } + } + } + impl $stream { pub fn new(metadata: &crate::rollups_stream::DAppMetadata) -> Self { + let chain_id = metadata.chain_id; + let dapp_address = metadata.dapp_address.clone(); Self { key: format!( "{{chain-{}:dapp-{}}}:{}", - metadata.chain_id, - hex::encode(metadata.dapp_address.inner()), + chain_id, + hex::encode(dapp_address.inner()), $key ), + chain_id: chain_id, + dapp_address: dapp_address, } } } @@ -102,7 +141,7 @@ pub(crate) use decl_broker_stream; #[cfg(test)] mod tests { use super::*; - use crate::ADDRESS_SIZE; + use crate::{broker::BrokerMultiStream, BrokerStream, ADDRESS_SIZE}; use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] @@ -119,4 +158,21 @@ mod tests { let stream = MockStream::new(&metadata); assert_eq!(stream.key, "{chain-123:dapp-fafafafafafafafafafafafafafafafafafafafa}:rollups-mock"); } + + #[test] + fn it_parses_the_key() { + let metadata = DAppMetadata { + chain_id: 123, + dapp_address: Address::new([0xfe; ADDRESS_SIZE]), + }; + + let stream = MockStream::new(&metadata); + let expected = "{chain-123:dapp-fefefefefefefefefefefefefefefefefefefefe}:rollups-mock"; + let key = stream.key().to_string(); + assert_eq!(expected, &key); + + let stream = MockStream::from_key(key); + assert_eq!(metadata.chain_id, stream.chain_id); + assert_eq!(metadata.dapp_address, stream.dapp_address); + } } diff --git a/offchain/rollups-events/tests/integration.rs b/offchain/rollups-events/tests/integration.rs index 6bfaa9cc8..bf0c3dcc8 100644 --- a/offchain/rollups-events/tests/integration.rs +++ b/offchain/rollups-events/tests/integration.rs @@ -1,6 +1,9 @@ // (c) Cartesi and individual authors (see AUTHORS) // SPDX-License-Identifier: Apache-2.0 (see LICENSE) +use std::collections::HashMap; +use std::time::{Duration, Instant}; + use backoff::ExponentialBackoff; use redis::aio::ConnectionManager; use redis::streams::StreamRangeReply; @@ -11,8 +14,8 @@ use testcontainers::{ }; use rollups_events::{ - Broker, BrokerConfig, BrokerEndpoint, BrokerError, BrokerStream, - RedactedUrl, Url, INITIAL_ID, + Broker, BrokerConfig, BrokerEndpoint, BrokerError, BrokerMultiStream, + BrokerStream, RedactedUrl, Url, INITIAL_ID, }; const STREAM_KEY: &'static str = "test-stream"; @@ -286,3 +289,141 @@ async fn test_it_does_not_block_when_consuming_empty_stream() { .expect("failed to peek"); assert!(matches!(event, None)); } + +// ------------------------------------------------------------------------------------------------ + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +struct AnotherMockStream { + key: String, + a: u8, + b: u8, +} + +impl AnotherMockStream { + fn new(a: u8, b: u8) -> Self { + let key = format!("{{a-{}:b-{}}}:{}", a, b, STREAM_KEY); + Self { key, a, b } + } +} + +impl BrokerStream for AnotherMockStream { + type Payload = MockPayload; + + fn key(&self) -> &str { + &self.key + } +} + +impl BrokerMultiStream for AnotherMockStream { + fn from_key(key: String) -> Self { + let re = r"^\{a-([^:]+):b-([^}]+)\}:test-stream$".to_string(); + let re = regex::Regex::new(&re).unwrap(); + let caps = re.captures(&key).unwrap(); + + let a = caps + .get(1) + .unwrap() + .as_str() + .to_string() + .parse::() + .unwrap(); + let b = caps + .get(2) + .unwrap() + .as_str() + .to_string() + .parse::() + .unwrap(); + + Self { key, a, b } + } +} + +#[test_log::test(tokio::test)] +async fn test_it_consumes_from_multiple_streams() { + let docker = Cli::default(); + let state = TestState::setup(&docker).await; + let mut broker = state.create_broker().await; + + // Creates the map of streams to last-consumed-ids. + let mut streams = HashMap::new(); + let initial_id = INITIAL_ID.to_string(); + streams.insert(AnotherMockStream::new(1, 2), initial_id.clone()); + streams.insert(AnotherMockStream::new(3, 4), initial_id.clone()); + streams.insert(AnotherMockStream::new(5, 6), initial_id.clone()); + + // Produces N events for each stream using the broker struct. + const N: usize = 3; + for stream in streams.keys() { + for i in 0..N { + let data = format!("{}{}{}", stream.a, stream.b, i); + let payload = MockPayload { data }; + let _ = broker + .produce(stream, payload) + .await + .expect("failed to produce events"); + } + } + + // Consumes all events using the broker struct. + let mut counters = HashMap::new(); + for _ in 0..(N * streams.len()) { + let (stream, event) = broker + .consume_blocking_from_multiple_streams(streams.clone()) + .await + .expect("failed to consume"); + + let i = counters + .entry(stream.clone()) + .and_modify(|n| *n += 1) + .or_insert(0) + .clone(); + + // Asserts that the payload is correct. + let expected = format!("{}{}{}", stream.a, stream.b, i); + assert_eq!(expected, event.payload.data); + + // Updates the map of streams with the last consumed id. + let replaced = streams.insert(stream, event.id); + // And asserts that the key from the map was indeed overwritten. + assert!(replaced.is_some()); + } + + // Asserts that N events were consumed from each stream. + for counter in counters.values() { + assert_eq!(N - 1, *counter); + } + + // Gets one of the streams. + let stream = streams.clone().into_keys().next().unwrap(); + let expected_stream = stream.clone(); + + // Sets a thread to produce an event in that stream after WAIT seconds. + const WAIT: u64 = 2; + let mut producer_broker = broker.clone(); + let handler = tokio::spawn(async move { + let duration = Duration::from_secs(WAIT); + tokio::time::sleep(duration).await; + let data = "final event".to_string(); + let payload = MockPayload { data }; + let _ = producer_broker + .produce(&stream, payload) + .await + .expect("failed to produce the final event"); + }); + + // Consumes the final event. + let marker = Instant::now(); + let (final_stream, _) = broker + .consume_blocking_from_multiple_streams(streams) + .await + .expect("failed to consume the final event"); + + // Asserts that the main thread blocked for at least WAIT seconds. + assert!(marker.elapsed().as_secs() >= WAIT); + + // Asserts that the event came from the correct stream. + assert_eq!(expected_stream, final_stream); + + handler.await.expect("failed to wait handler"); +} diff --git a/offchain/test-fixtures/src/broker.rs b/offchain/test-fixtures/src/broker.rs index 95b9ce19b..3ab8d1d0a 100644 --- a/offchain/test-fixtures/src/broker.rs +++ b/offchain/test-fixtures/src/broker.rs @@ -1,6 +1,8 @@ // (c) Cartesi and individual authors (see AUTHORS) // SPDX-License-Identifier: Apache-2.0 (see LICENSE) +use std::collections::HashMap; + use backoff::ExponentialBackoff; use rollups_events::{ Address, Broker, BrokerConfig, BrokerEndpoint, DAppMetadata, Event, @@ -16,6 +18,40 @@ use tokio::sync::Mutex; const CHAIN_ID: u64 = 0; const DAPP_ADDRESS: Address = Address::new([0xfa; ADDRESS_SIZE]); const CONSUME_TIMEOUT: usize = 10_000; // ms + // +async fn start_redis( + docker: &Cli, +) -> (Container, BrokerEndpoint, Mutex) { + tracing::trace!("starting redis docker container"); + let image = GenericImage::new("redis", "6.2").with_wait_for( + WaitFor::message_on_stdout("Ready to accept connections"), + ); + let node = docker.run(image); + let port = node.get_host_port_ipv4(6379); + let endpoint = BrokerEndpoint::Single( + Url::parse(&format!("redis://127.0.0.1:{}", port)) + .map(RedactedUrl::new) + .expect("failed to parse Redis Url"), + ); + + let backoff = ExponentialBackoff::default(); + let config = BrokerConfig { + redis_endpoint: endpoint.clone(), + consume_timeout: CONSUME_TIMEOUT, + backoff, + }; + + tracing::trace!(?endpoint, "connecting to redis with rollups_events crate"); + let client = Mutex::new( + Broker::new(config) + .await + .expect("failed to connect to broker"), + ); + + (node, endpoint, client) +} + +// ------------------------------------------------------------------------------------------------ pub struct BrokerFixture<'d> { _node: Container<'d, GenericImage>, @@ -33,51 +69,23 @@ impl BrokerFixture<'_> { pub async fn setup(docker: &Cli) -> BrokerFixture<'_> { tracing::info!("setting up redis fixture"); - tracing::trace!("starting redis docker container"); - let image = GenericImage::new("redis", "6.2").with_wait_for( - WaitFor::message_on_stdout("Ready to accept connections"), - ); - let node = docker.run(image); - let port = node.get_host_port_ipv4(6379); - let redis_endpoint = BrokerEndpoint::Single( - Url::parse(&format!("redis://127.0.0.1:{}", port)) - .map(RedactedUrl::new) - .expect("failed to parse Redis Url"), - ); - let chain_id = CHAIN_ID; - let dapp_address = DAPP_ADDRESS; - let backoff = ExponentialBackoff::default(); + let (redis_node, redis_endpoint, redis_client) = + start_redis(&docker).await; + let metadata = DAppMetadata { - chain_id, - dapp_address: dapp_address.clone(), - }; - let inputs_stream = RollupsInputsStream::new(&metadata); - let claims_stream = RollupsClaimsStream::new(metadata.chain_id); - let outputs_stream = RollupsOutputsStream::new(&metadata); - let config = BrokerConfig { - redis_endpoint: redis_endpoint.clone(), - consume_timeout: CONSUME_TIMEOUT, - backoff, + chain_id: CHAIN_ID, + dapp_address: DAPP_ADDRESS.clone(), }; - tracing::trace!( - ?redis_endpoint, - "connecting to redis with rollups_events crate" - ); - let client = Mutex::new( - Broker::new(config) - .await - .expect("failed to connect to broker"), - ); BrokerFixture { - _node: node, - client, - inputs_stream, - claims_stream, - outputs_stream, + _node: redis_node, + client: redis_client, + inputs_stream: RollupsInputsStream::new(&metadata), + claims_stream: RollupsClaimsStream::new(&metadata), + outputs_stream: RollupsOutputsStream::new(&metadata), redis_endpoint, - chain_id, - dapp_address, + chain_id: CHAIN_ID, + dapp_address: DAPP_ADDRESS, } } @@ -246,3 +254,91 @@ impl BrokerFixture<'_> { .expect("failed to produce output"); } } + +// ------------------------------------------------------------------------------------------------ + +pub struct ClaimerMultidappBrokerFixture<'d> { + _node: Container<'d, GenericImage>, + client: Mutex, + redis_endpoint: BrokerEndpoint, + claims_streams: HashMap, +} + +impl ClaimerMultidappBrokerFixture<'_> { + #[tracing::instrument(level = "trace", skip_all)] + pub async fn setup( + docker: &Cli, + dapps: Vec<(u64, Address)>, + ) -> ClaimerMultidappBrokerFixture<'_> { + let (redis_node, redis_endpoint, redis_client) = + start_redis(&docker).await; + + let claims_streams = dapps + .into_iter() + .map(|(chain_id, dapp_address)| { + let dapp_metadata = DAppMetadata { + chain_id, + dapp_address: dapp_address.clone(), + }; + let stream = RollupsClaimsStream::new(&dapp_metadata); + (dapp_address, stream) + }) + .collect::>(); + let claims_streams = HashMap::from_iter(claims_streams); + + ClaimerMultidappBrokerFixture { + _node: redis_node, + client: redis_client, + redis_endpoint, + claims_streams, + } + } + + pub fn redis_endpoint(&self) -> &BrokerEndpoint { + &self.redis_endpoint + } + + #[tracing::instrument(level = "trace", skip_all)] + pub async fn set_dapps(&self, dapps: Vec
) { + self.client.lock().await.set_dapps(dapps).await.unwrap() + } + + // Different from the default function, + // this one requires `rollups_claim.dapp_address` to be set, + // and to match one of the addresses from the streams. + #[tracing::instrument(level = "trace", skip_all)] + pub async fn produce_rollups_claim(&self, rollups_claim: RollupsClaim) { + tracing::trace!(?rollups_claim.epoch_hash, "producing rollups-claim event"); + + let stream = self + .claims_streams + .get(&rollups_claim.dapp_address) + .unwrap() + .clone(); + + { + let last_claim = self + .client + .lock() + .await + .peek_latest(&stream) + .await + .expect("failed to get latest claim"); + let epoch_index = match last_claim { + Some(event) => event.payload.epoch_index + 1, + None => 0, + }; + assert_eq!( + rollups_claim.epoch_index, epoch_index, + "invalid epoch index", + ); + } + + self.client + .lock() + .await + .produce(&stream, rollups_claim) + .await + .expect("failed to produce claim"); + } +}