diff --git a/CHANGELOG.md b/CHANGELOG.md index a0b0c1a04..d0264d4c9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Added support for the experimental multidapp claimer. +- Added the `DAPP_CONTRACT_ADDRESS` environment variable to the `authority-claimer`. If let unset, the service instantiates the MultidappClaimer, that reads dapp addresses from Redis. + ### Changed - Disabled the `authority-claimer` when `CARTESI_EXPERIMENTAL_SUNODO_VALIDATOR_ENABLED` is set to `true`. diff --git a/internal/node/services.go b/internal/node/services.go index bf60efd51..b780950fc 100644 --- a/internal/node/services.go +++ b/internal/node/services.go @@ -117,6 +117,8 @@ func newAuthorityClaimer(c config.NodeConfig, workDir string) services.CommandSe s.Env = append(s.Env, fmt.Sprintf("HISTORY_ADDRESS=%v", c.ContractsHistoryAddress)) s.Env = append(s.Env, fmt.Sprintf("AUTHORITY_ADDRESS=%v", c.ContractsAuthorityAddress)) s.Env = append(s.Env, fmt.Sprintf("INPUT_BOX_ADDRESS=%v", c.ContractsInputBoxAddress)) + s.Env = append(s.Env, fmt.Sprintf("DAPP_CONTRACT_ADDRESS=%v", + c.ContractsApplicationAddress)) s.Env = append(s.Env, fmt.Sprintf("GENESIS_BLOCK=%v", c.ContractsInputBoxDeploymentBlockNumber)) s.Env = append(s.Env, fmt.Sprintf("AUTHORITY_CLAIMER_HTTP_SERVER_PORT=%v", diff --git a/offchain/Cargo.lock b/offchain/Cargo.lock index 65f3decd8..9f990e30f 100644 --- a/offchain/Cargo.lock +++ b/offchain/Cargo.lock @@ -4451,6 +4451,7 @@ dependencies = [ "prometheus-client", "redacted", "redis", + "regex", "serde", "serde_json", "snafu 0.8.2", diff --git a/offchain/advance-runner/src/broker.rs b/offchain/advance-runner/src/broker.rs index 2610dce94..d871e1179 100644 --- a/offchain/advance-runner/src/broker.rs +++ b/offchain/advance-runner/src/broker.rs @@ -62,7 +62,7 @@ impl BrokerFacade { let client = Broker::new(config).await.context(BrokerInternalSnafu)?; let inputs_stream = RollupsInputsStream::new(&dapp_metadata); let outputs_stream = RollupsOutputsStream::new(&dapp_metadata); - let claims_stream = RollupsClaimsStream::new(dapp_metadata.chain_id); + let claims_stream = RollupsClaimsStream::new(&dapp_metadata); Ok(Self { client, inputs_stream, @@ -105,7 +105,8 @@ impl BrokerFacade { tracing::trace!(rollups_claim.epoch_index, ?rollups_claim.epoch_hash, - "producing rollups claim" + "producing rollups claim for stream {:?}", + self.claims_stream, ); let last_claim_event = self diff --git a/offchain/advance-runner/src/runner.rs b/offchain/advance-runner/src/runner.rs index 81736fcc3..69b7e89c5 100644 --- a/offchain/advance-runner/src/runner.rs +++ b/offchain/advance-runner/src/runner.rs @@ -127,11 +127,15 @@ impl Runner { .context(ProduceOutputsSnafu)?; tracing::trace!("produced outputs in broker stream"); + let dapp_address = rollups_claim.dapp_address.clone(); self.broker .produce_rollups_claim(rollups_claim) .await .context(ProduceClaimSnafu)?; - tracing::info!("produced epoch claim in broker stream"); + tracing::info!( + "produced epoch claim in broker stream for dapp address {:?}", + dapp_address + ); } Err(source) => { if let ServerManagerError::EmptyEpochError { .. } = source { diff --git a/offchain/authority-claimer/README.md b/offchain/authority-claimer/README.md index fc91cb73c..068689fac 100644 --- a/offchain/authority-claimer/README.md +++ b/offchain/authority-claimer/README.md @@ -2,3 +2,28 @@ This service submits rollups claims consumed from the broker to the blockchain using the [tx-manager crate](https://github.com/cartesi/tx-manager). It runs at the end of every epoch, when new claims are inserted on the broker. + +### Multi-dapp Mode + +(This is an **experimental** feature! Don't try it unless you know exactly what you are doing!) + +The `authority-claimer` can be configured to run in "multidapp mode". +To do so, the `DAPP_CONTRACT_ADDRESS` environment variable must be left unset. +This will force the claimer to instantiate a `MultidappBrokerListener` instead of a `DefaultBrokerListener`. + +In multidapp mode, the claimer reads claims from the broker for multiple applications. +All dapps must share the same History contract and the same chain ID. + +Instead of using evironment variables, + the claimer will get the list of application addresses from Redis, + through the `experimental-dapp-addresses-config` key. +This key holds a Redis Set value. +You must use commands such as SADD and SREM to manipulate the list of addresses. +Addresses are encoded as hex strings without the leading `"0x"`. +Redis values are case sensitive, so addresses must be in lowercase format. +Example address value: `"0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a"`. + +You may rewrite the list of addresses at any time, + the claimer will adjust accordingly. +The list of addresses can be empty at any time, + the claimer will wait until an application address is added to the set to resume operations. diff --git a/offchain/authority-claimer/src/claimer.rs b/offchain/authority-claimer/src/claimer.rs index 43c475d20..ae9ec7b9c 100644 --- a/offchain/authority-claimer/src/claimer.rs +++ b/offchain/authority-claimer/src/claimer.rs @@ -4,7 +4,9 @@ use async_trait::async_trait; use snafu::ResultExt; use std::fmt::Debug; -use tracing::{info, trace}; +use tracing::{debug, info}; + +use rollups_events::Address; use crate::{ checker::DuplicateChecker, listener::BrokerListener, @@ -31,6 +33,9 @@ pub enum ClaimerError< D: DuplicateChecker, T: TransactionSender, > { + #[snafu(display("invalid app address {:?}", app_address))] + InvalidAppAddress { app_address: Address }, + #[snafu(display("broker listener error"))] BrokerListenerError { source: B::Error }, @@ -84,14 +89,14 @@ where type Error = ClaimerError; async fn start(mut self) -> Result<(), Self::Error> { - trace!("Starting the authority claimer loop"); + debug!("Starting the authority claimer loop"); loop { let rollups_claim = self .broker_listener .listen() .await .context(BrokerListenerSnafu)?; - trace!("Got a claim from the broker: {:?}", rollups_claim); + debug!("Got a claim from the broker: {:?}", rollups_claim); let is_duplicated_rollups_claim = self .duplicate_checker @@ -99,7 +104,7 @@ where .await .context(DuplicatedClaimSnafu)?; if is_duplicated_rollups_claim { - trace!("It was a duplicated claim"); + info!("Duplicate claim detected: {:?}", rollups_claim); continue; } diff --git a/offchain/authority-claimer/src/config/cli.rs b/offchain/authority-claimer/src/config/cli.rs index 5df9c6ffd..db1979aeb 100644 --- a/offchain/authority-claimer/src/config/cli.rs +++ b/offchain/authority-claimer/src/config/cli.rs @@ -6,9 +6,10 @@ use eth_tx_manager::{ config::{TxEnvCLIConfig as TxManagerCLIConfig, TxManagerConfig}, Priority, }; +use ethers::utils::hex; use log::{LogConfig, LogEnvCliConfig}; use redacted::Redacted; -use rollups_events::{BrokerCLIConfig, BrokerConfig}; +use rollups_events::{Address, BrokerCLIConfig, BrokerConfig}; use rusoto_core::Region; use snafu::ResultExt; use std::{fs, str::FromStr}; @@ -47,6 +48,10 @@ pub(crate) struct AuthorityClaimerCLI { #[command(flatten)] pub contracts_config: ContractsCLIConfig, + /// Address of rollups dapp + #[arg(long, env)] + pub dapp_contract_address: Option, + /// Genesis block for reading blockchain events #[arg(long, env, default_value_t = 1)] pub genesis_block: u64, @@ -72,6 +77,17 @@ impl TryFrom for AuthorityClaimerConfig { ContractsConfig::try_from(cli_config.contracts_config) .context(ContractsSnafu)?; + let dapp_contract_address: Option
= cli_config + .dapp_contract_address + .map(|raw_dapp_contract_address| { + let address: [u8; 20] = + hex::decode(&raw_dapp_contract_address[2..]) + .expect("Dapp json parse error") + .try_into() + .expect("Dapp address with wrong size"); + address.into() + }); + Ok(AuthorityClaimerConfig { tx_manager_config, tx_signing_config, @@ -79,6 +95,7 @@ impl TryFrom for AuthorityClaimerConfig { broker_config, log_config, contracts_config, + dapp_address: dapp_contract_address, genesis_block: cli_config.genesis_block, }) } diff --git a/offchain/authority-claimer/src/config/mod.rs b/offchain/authority-claimer/src/config/mod.rs index 21d1932b2..175163188 100644 --- a/offchain/authority-claimer/src/config/mod.rs +++ b/offchain/authority-claimer/src/config/mod.rs @@ -13,7 +13,7 @@ use eth_tx_manager::{config::TxManagerConfig, Priority}; use http_server::HttpServerConfig; use log::LogConfig; use redacted::Redacted; -use rollups_events::BrokerConfig; +use rollups_events::{Address, BrokerConfig}; use rusoto_core::Region; #[derive(Debug, Clone)] @@ -30,6 +30,7 @@ pub struct AuthorityClaimerConfig { pub broker_config: BrokerConfig, pub log_config: LogConfig, pub contracts_config: ContractsConfig, + pub dapp_address: Option
, pub genesis_block: u64, } diff --git a/offchain/authority-claimer/src/lib.rs b/offchain/authority-claimer/src/lib.rs index 944efb4b4..3e6571517 100644 --- a/offchain/authority-claimer/src/lib.rs +++ b/offchain/authority-claimer/src/lib.rs @@ -10,8 +10,9 @@ pub mod sender; pub mod signer; use config::Config; +use listener::{BrokerListener, MultidappBrokerListener}; use snafu::Error; -use tracing::trace; +use tracing::{info, trace}; use crate::{ checker::DefaultDuplicateChecker, @@ -22,21 +23,39 @@ use crate::{ }; pub async fn run(config: Config) -> Result<(), Box> { - // Creating the metrics and health server. let metrics = AuthorityClaimerMetrics::new(); + let dapp_address = config.authority_claimer_config.dapp_address.clone(); + if let Some(dapp_address) = dapp_address { + info!("Creating the default broker listener"); + let broker_listener = DefaultBrokerListener::new( + config.authority_claimer_config.broker_config.clone(), + config.authority_claimer_config.tx_manager_config.chain_id, + dapp_address, + ) + .await?; + _run(metrics, config, broker_listener).await + } else { + info!("Creating the multidapp broker listener"); + let broker_listener = MultidappBrokerListener::new( + config.authority_claimer_config.broker_config.clone(), + config.authority_claimer_config.tx_manager_config.chain_id, + ) + .await?; + _run(metrics, config, broker_listener).await + } +} + +async fn _run( + metrics: AuthorityClaimerMetrics, + config: Config, + broker_listener: B, +) -> Result<(), Box> { let http_server_handle = http_server::start(config.http_server_config, metrics.clone().into()); - let config = config.authority_claimer_config; - let chain_id = config.tx_manager_config.chain_id; - // Creating the broker listener. - trace!("Creating the broker listener"); - let broker_listener = - DefaultBrokerListener::new(config.broker_config.clone(), chain_id) - .await?; + let chain_id = config.tx_manager_config.chain_id; - // Creating the duplicate checker. trace!("Creating the duplicate checker"); let duplicate_checker = DefaultDuplicateChecker::new( config.tx_manager_config.provider_http_endpoint.clone(), @@ -46,18 +65,18 @@ pub async fn run(config: Config) -> Result<(), Box> { ) .await?; - // Creating the transaction sender. trace!("Creating the transaction sender"); let transaction_sender = DefaultTransactionSender::new(config.clone(), chain_id, metrics) .await?; - // Creating the claimer loop. + // Creating the claimer. let claimer = DefaultClaimer::new( broker_listener, duplicate_checker, transaction_sender, ); + let claimer_handle = claimer.start(); // Starting the HTTP server and the claimer loop. diff --git a/offchain/authority-claimer/src/listener.rs b/offchain/authority-claimer/src/listener.rs index 3f50a1cb1..992611908 100644 --- a/offchain/authority-claimer/src/listener.rs +++ b/offchain/authority-claimer/src/listener.rs @@ -3,25 +3,31 @@ use async_trait::async_trait; use rollups_events::{ - Broker, BrokerConfig, BrokerError, RollupsClaim, RollupsClaimsStream, - INITIAL_ID, + Address, Broker, BrokerConfig, BrokerError, DAppMetadata, RollupsClaim, + RollupsClaimsStream, INITIAL_ID, }; use snafu::ResultExt; -use std::fmt::Debug; +use std::{collections::HashMap, fmt::Debug}; -/// The `BrokerListener` listens for new claims from the broker +/// The `BrokerListener` listens for new claims from the broker. #[async_trait] pub trait BrokerListener: Debug { type Error: snafu::Error + 'static; - /// Listen to claims async fn listen(&mut self) -> Result; } +#[derive(Debug, snafu::Snafu)] +pub enum BrokerListenerError { + #[snafu(display("broker error"))] + BrokerError { source: BrokerError }, +} + // ------------------------------------------------------------------------------------------------ // DefaultBrokerListener // ------------------------------------------------------------------------------------------------ +/// The `DefaultBrokerListener` only listens for claims from one dapp. #[derive(Debug)] pub struct DefaultBrokerListener { broker: Broker, @@ -29,20 +35,19 @@ pub struct DefaultBrokerListener { last_claim_id: String, } -#[derive(Debug, snafu::Snafu)] -pub enum BrokerListenerError { - #[snafu(display("broker error"))] - BrokerError { source: BrokerError }, -} - impl DefaultBrokerListener { pub async fn new( broker_config: BrokerConfig, chain_id: u64, + dapp_address: Address, ) -> Result { tracing::trace!("Connecting to the broker ({:?})", broker_config); let broker = Broker::new(broker_config).await?; - let stream = RollupsClaimsStream::new(chain_id); + let dapp_metadata = DAppMetadata { + chain_id, + dapp_address, + }; + let stream = RollupsClaimsStream::new(&dapp_metadata); let last_claim_id = INITIAL_ID.to_string(); Ok(Self { broker, @@ -70,31 +75,181 @@ impl BrokerListener for DefaultBrokerListener { } } +// ------------------------------------------------------------------------------------------------ +// MultidappBrokerListener +// ------------------------------------------------------------------------------------------------ + +/// The `MultidappBrokerListener` listens for claims from multiple dapps. +/// It updates its internal list of dapps by consuming from redis' DappsStream. +#[derive(Debug)] +pub struct MultidappBrokerListener { + broker: Broker, + streams: HashMap, // stream => last-claim-id + buffer: HashMap, + chain_id: u64, +} + +impl MultidappBrokerListener { + pub async fn new( + broker_config: BrokerConfig, + chain_id: u64, + ) -> Result { + tracing::trace!( + "Connecting to the broker ({:?}) on multidapp mode", + broker_config + ); + let broker = Broker::new(broker_config).await?; + let streams = HashMap::new(); + let buffer = HashMap::new(); + Ok(Self { + broker, + streams, + buffer, + chain_id, + }) + } +} + +impl MultidappBrokerListener { + /// Reads addresses from the DappStream and + /// converts them to the stream to last-consumed-id map. + async fn update_streams(&mut self) -> Result<(), BrokerListenerError> { + let initial_id = INITIAL_ID.to_string(); + + // Gets the dapps from the broker. + let dapps = self.broker.get_dapps().await.context(BrokerSnafu)?; + assert!(!dapps.is_empty()); + tracing::info!( + "Got the following dapps from key \"{}\": {:?}", + rollups_events::DAPPS_KEY, + dapps + ); + + // Converts dapps to streams. + let streams: Vec<_> = dapps + .into_iter() + .map(|dapp_address| { + RollupsClaimsStream::new(&DAppMetadata { + chain_id: self.chain_id, + dapp_address, + }) + }) + .collect(); + + // Removes obsolete dapps from the buffer, if any. + for key in self.buffer.clone().keys() { + if !streams.contains(key) { + self.buffer.remove(key); + } + } + + // Adds the last consumed ids. + let streams: Vec<_> = streams + .into_iter() + .map(|stream| { + let id = self.streams.get(&stream).unwrap_or(&initial_id); + (stream, id.to_string()) + }) + .collect(); + + self.streams = HashMap::from_iter(streams); + Ok(()) + } + + // Returns true if it succeeded in filling the buffer and false otherwise. + async fn fill_buffer(&mut self) -> Result { + let streams_and_events = self + .broker + .consume_blocking_from_multiple_streams(self.streams.clone()) + .await; + if let Err(BrokerError::FailedToConsume) = streams_and_events { + return Ok(false); + } + + let streams_and_events = streams_and_events.context(BrokerSnafu)?; + for (stream, event) in streams_and_events { + // Updates the last-consumed-id from the stream. + let replaced = self.streams.insert(stream.clone(), event.id); + assert!(replaced.is_some()); + + let replaced = self.buffer.insert(stream, event.payload); + assert!(replaced.is_none()); + } + + Ok(true) + } +} + +#[async_trait] +impl BrokerListener for MultidappBrokerListener { + type Error = BrokerListenerError; + + async fn listen(&mut self) -> Result { + self.update_streams().await?; + + tracing::trace!("Waiting for a claim"); + if self.buffer.is_empty() { + loop { + if self.fill_buffer().await? { + break; + } else { + self.update_streams().await?; + } + } + } + + let buffer = self.buffer.clone(); + let (stream, rollups_claim) = buffer.into_iter().next().unwrap(); + self.buffer.remove(&stream); + Ok(rollups_claim) + } +} + +// ------------------------------------------------------------------------------------------------ +// Tests +// ------------------------------------------------------------------------------------------------ + #[cfg(test)] mod tests { - use std::time::Duration; + use std::{collections::HashMap, time::Duration}; use testcontainers::clients::Cli; - use test_fixtures::BrokerFixture; - - use crate::listener::{BrokerListener, DefaultBrokerListener}; + use test_fixtures::{broker::ClaimerMultidappBrokerFixture, BrokerFixture}; use backoff::ExponentialBackoffBuilder; use rollups_events::{ - BrokerConfig, BrokerEndpoint, BrokerError, RedactedUrl, RollupsClaim, - Url, + Address, BrokerConfig, BrokerEndpoint, BrokerError, RedactedUrl, + RollupsClaim, RollupsClaimsStream, Url, }; - // ------------------------------------------------------------------------------------------------ + use crate::listener::BrokerListener; + + use super::{DefaultBrokerListener, MultidappBrokerListener}; + + // -------------------------------------------------------------------------------------------- // Broker Mock - // ------------------------------------------------------------------------------------------------ + // -------------------------------------------------------------------------------------------- + + fn config(redis_endpoint: BrokerEndpoint) -> BrokerConfig { + BrokerConfig { + redis_endpoint, + consume_timeout: 300000, + backoff: ExponentialBackoffBuilder::new() + .with_initial_interval(Duration::from_millis(1000)) + .with_max_elapsed_time(Some(Duration::from_millis(3000))) + .build(), + } + } + + // -------------------------------------------------------------------------------------------- + // DefaultListener Tests + // -------------------------------------------------------------------------------------------- - pub async fn setup_broker( + async fn setup_default_broker_listener( docker: &Cli, should_fail: bool, ) -> Result<(BrokerFixture, DefaultBrokerListener), BrokerError> { let fixture = BrokerFixture::setup(docker).await; - let redis_endpoint = if should_fail { BrokerEndpoint::Single(RedactedUrl::new( Url::parse("https://invalid.com").unwrap(), @@ -102,21 +257,16 @@ mod tests { } else { fixture.redis_endpoint().clone() }; - - let config = BrokerConfig { - redis_endpoint, - consume_timeout: 300000, - backoff: ExponentialBackoffBuilder::new() - .with_initial_interval(Duration::from_millis(1000)) - .with_max_elapsed_time(Some(Duration::from_millis(3000))) - .build(), - }; - let broker = - DefaultBrokerListener::new(config, fixture.chain_id()).await?; + let broker = DefaultBrokerListener::new( + config(redis_endpoint), + fixture.chain_id(), + fixture.dapp_address().clone(), + ) + .await?; Ok((fixture, broker)) } - pub async fn produce_rollups_claims( + async fn default_produce_claims( fixture: &BrokerFixture<'_>, n: usize, epoch_index_start: usize, @@ -132,59 +282,57 @@ mod tests { } /// The last claim should trigger an `EndError` error. - pub async fn produce_last_claim( + async fn default_produce_last_claim( fixture: &BrokerFixture<'_>, epoch_index: usize, ) -> Vec { - produce_rollups_claims(fixture, 1, epoch_index).await + default_produce_claims(fixture, 1, epoch_index).await } - // ------------------------------------------------------------------------------------------------ - // Listener Unit Tests - // ------------------------------------------------------------------------------------------------ + // -------------------------------------------------------------------------------------------- #[tokio::test] - async fn instantiate_new_broker_listener_ok() { + async fn instantiate_new_default_broker_listener_ok() { let docker = Cli::default(); - let _ = setup_broker(&docker, false).await; + let _ = setup_default_broker_listener(&docker, false).await; } #[tokio::test] - async fn instantiate_new_broker_listener_error() { + async fn instantiate_new_default_broker_listener_error() { let docker = Cli::default(); - let result = setup_broker(&docker, true).await; - assert!(result.is_err(), "setup_broker didn't fail as it should"); + let result = setup_default_broker_listener(&docker, true).await; + assert!(result.is_err(), "setup didn't fail as it should"); let error = result.err().unwrap().to_string(); assert_eq!(error, "error connecting to Redis"); } #[tokio::test] - async fn start_broker_listener_with_one_claim_enqueued() { + async fn start_default_broker_listener_with_one_claim_enqueued() { let docker = Cli::default(); let (fixture, mut broker_listener) = - setup_broker(&docker, false).await.unwrap(); + setup_default_broker_listener(&docker, false).await.unwrap(); let n = 5; - produce_rollups_claims(&fixture, n, 0).await; - produce_last_claim(&fixture, n).await; + default_produce_claims(&fixture, n, 0).await; + default_produce_last_claim(&fixture, n).await; let result = broker_listener.listen().await; assert!(result.is_ok()); } #[tokio::test] - async fn start_broker_listener_with_claims_enqueued() { + async fn start_default_broker_listener_with_claims_enqueued() { let docker = Cli::default(); let (fixture, mut broker_listener) = - setup_broker(&docker, false).await.unwrap(); - produce_last_claim(&fixture, 0).await; + setup_default_broker_listener(&docker, false).await.unwrap(); + default_produce_last_claim(&fixture, 0).await; let claim = broker_listener.listen().await; assert!(claim.is_ok()); } #[tokio::test] - async fn start_broker_listener_listener_with_no_claims_enqueued() { + async fn start_default_broker_listener_listener_with_no_claims_enqueued() { let docker = Cli::default(); let (fixture, mut broker_listener) = - setup_broker(&docker, false).await.unwrap(); + setup_default_broker_listener(&docker, false).await.unwrap(); let n = 7; let broker_listener_thread = tokio::spawn(async move { @@ -198,18 +346,478 @@ mod tests { let x = 2; println!("Creating {} claims.", x); - produce_rollups_claims(&fixture, x, 0).await; + default_produce_claims(&fixture, x, 0).await; println!("Going to sleep for 2 seconds."); tokio::time::sleep(Duration::from_secs(2)).await; let y = 5; println!("Creating {} claims.", y); - produce_rollups_claims(&fixture, y, x).await; + default_produce_claims(&fixture, y, x).await; assert_eq!(x + y, n); - produce_last_claim(&fixture, n).await; + default_produce_last_claim(&fixture, n).await; + + broker_listener_thread.await.unwrap(); + } + + // -------------------------------------------------------------------------------------------- + // MultidappListener Tests + // -------------------------------------------------------------------------------------------- + + async fn setup_multidapp_listener( + docker: &Cli, + should_fail: bool, + ) -> Result< + ( + ClaimerMultidappBrokerFixture, + MultidappBrokerListener, + Vec
, + ), + BrokerError, + > { + let chain_id: u64 = 0; + let dapp_addresses: Vec
= vec![ + [3; 20].into(), // + [5; 20].into(), // + [10; 20].into(), // + ]; + let dapps: Vec<_> = dapp_addresses + .clone() + .into_iter() + .map(|dapp_address| (chain_id, dapp_address)) + .collect(); + + let fixture = + ClaimerMultidappBrokerFixture::setup(docker, dapps.clone()).await; + fixture.dapps_set(dapp_addresses.clone()).await; + + let redis_endpoint = if should_fail { + BrokerEndpoint::Single(RedactedUrl::new( + Url::parse("https://invalid.com").unwrap(), + )) + } else { + fixture.redis_endpoint().clone() + }; + + let listener = + MultidappBrokerListener::new(config(redis_endpoint), chain_id) + .await?; + Ok((fixture, listener, dapp_addresses)) + } + + // For each index in indexes, this function produces a claim + // with rollups_claim.dapp_address = dapps[index] + // and rollups_claim.epoch_index = epochs[index]. + // It then increments epochs[index]. + async fn multidapp_produce_claims( + fixture: &ClaimerMultidappBrokerFixture<'_>, + epochs: &mut Vec, + dapps: &Vec
, + indexes: &Vec, + ) { + for &index in indexes { + let epoch = *epochs.get(index).unwrap(); + + let mut rollups_claim = RollupsClaim::default(); + rollups_claim.dapp_address = dapps.get(index).unwrap().clone(); + rollups_claim.epoch_index = epoch; + fixture.produce_rollups_claim(rollups_claim.clone()).await; + + epochs[index] = epoch + 1; + } + } + + // Asserts that listener.listen() will return indexes.len() claims, + // and that for each index in indexes + // there is an unique claim for which claim.dapp_address = dapps[index]. + async fn assert_listen( + listener: &mut MultidappBrokerListener, + dapps: &Vec
, + indexes: &Vec, + ) { + let mut dapps: Vec<_> = indexes + .iter() + .map(|&index| dapps.get(index).unwrap().clone()) + .collect(); + for _ in indexes.clone() { + println!("--- Listening..."); + let result = listener.listen().await; + assert!(result.is_ok(), "{:?}", result.unwrap_err()); + let dapp = result.unwrap().dapp_address; + + let index = dapps.iter().position(|expected| *expected == dapp); + assert!(index.is_some()); + println!("--- Listened for a claim from {:?}", dapp); + dapps.remove(index.unwrap()); + } + assert!(dapps.is_empty()); + } + + fn streams_to_vec( + streams: &HashMap, + ) -> Vec
{ + streams + .keys() + .into_iter() + .map(|stream| stream.dapp_address.clone()) + .collect::>() + } + + fn assert_eq_vec(mut v1: Vec
, mut v2: Vec
) { + assert_eq!(v1.len(), v2.len()); + while !v1.is_empty() { + let e1 = v1.pop().unwrap(); + let e2 = v2.pop().unwrap(); + assert_eq!(e1, e2); + } + } + + // -------------------------------------------------------------------------------------------- + + #[tokio::test] + async fn instantiate_multidapp_broker_listener_ok() { + let docker = Cli::default(); + let _ = setup_multidapp_listener(&docker, false).await; + } + + #[tokio::test] + async fn instantiate_multidapp_broker_listener_error() { + let docker = Cli::default(); + let result = setup_multidapp_listener(&docker, true).await; + assert!(result.is_err(), "setup didn't fail as it should"); + let error = result.err().unwrap().to_string(); + assert_eq!(error, "error connecting to Redis"); + } + + #[tokio::test] + async fn multidapp_listen_with_no_dapps() { + let docker = Cli::default(); + let (fixture, mut listener, dapps) = + setup_multidapp_listener(&docker, false).await.unwrap(); + fixture.dapps_set(vec![]).await; + let mut epochs = vec![0; dapps.len()]; + let indexes = vec![0, 1, 2]; + multidapp_produce_claims(&fixture, &mut epochs, &dapps, &indexes).await; + + let thread = tokio::spawn(async move { + let _ = listener.listen().await; + unreachable!(); + }); + let result = tokio::time::timeout(Duration::from_secs(3), thread).await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn multidapp_listen_with_one_dapp() { + let docker = Cli::default(); + let (fixture, mut listener, dapps) = + setup_multidapp_listener(&docker, false).await.unwrap(); + fixture.dapps_set(vec![dapps.get(0).unwrap().clone()]).await; + let mut epochs = vec![0; dapps.len()]; + let indexes = vec![2, 1, 1, 2, 0]; + multidapp_produce_claims(&fixture, &mut epochs, &dapps, &indexes).await; + assert_listen(&mut listener, &dapps, &vec![0]).await; + } + + #[tokio::test] + async fn multidapp_listen_with_duplicate_dapps() { + let docker = Cli::default(); + let (fixture, mut listener, dapps) = + setup_multidapp_listener(&docker, false).await.unwrap(); + fixture.dapps_set(vec![]).await; + + // Initializes with 0 addresses in the set. + assert_eq!(0, fixture.dapps_members().await.len()); + + // We add a lowercase and an uppercase version of the same address. + let dapp: Address = [10; 20].into(); + fixture.dapps_add(dapp.to_string().to_lowercase()).await; + fixture.dapps_add(dapp.to_string().to_uppercase()).await; + + // We now have 2 addresses in the set. + assert_eq!(2, fixture.dapps_members().await.len()); + + // We then produce some claims and listen for them. + let mut epochs = vec![0; dapps.len()]; + let indexes = vec![2, 2, 0]; + multidapp_produce_claims(&fixture, &mut epochs, &dapps, &indexes).await; + let indexes = vec![2, 2]; + assert_listen(&mut listener, &dapps, &indexes).await; + + // Now we have 1 address because one of the duplicates got deleted. + assert_eq!(1, fixture.dapps_members().await.len()); + } + + #[tokio::test] + async fn multidapp_listen_with_changing_dapps() { + let docker = Cli::default(); + let (fixture, mut listener, dapps) = + setup_multidapp_listener(&docker, false).await.unwrap(); + + let first_batch_dapps = vec![ + dapps.get(0).unwrap().clone(), // + ]; + let second_batch_dapps = vec![ + dapps.get(0).unwrap().clone(), // + dapps.get(1).unwrap().clone(), // + ]; + let third_batch_dapps = vec![ + dapps.get(0).unwrap().clone(), // + dapps.get(1).unwrap().clone(), // + dapps.get(2).unwrap().clone(), // + ]; + let fourth_batch_dapps = vec![ + dapps.get(2).unwrap().clone(), // + ]; + + let mut epochs = vec![0; dapps.len()]; + let first_batch = vec![0, 0]; + let second_batch = vec![1, 0]; + let third_batch = vec![2, 1, 0]; + let fourth_batch = vec![2]; + + { + println!("=== Producing the first batch of claims."); + multidapp_produce_claims( + &fixture, + &mut epochs, + &dapps, + &first_batch, + ) + .await; + println!("=== Epochs: {:?}", epochs); + + println!("--- Setting dapps..."); + fixture.dapps_set(first_batch_dapps.clone()).await; + assert_listen(&mut listener, &dapps, &first_batch).await; + let mut dapps = streams_to_vec(&listener.streams); + dapps.sort(); + println!("--- Current dapps: {:?}", dapps); + assert_eq_vec(first_batch_dapps, dapps); + println!("--- All good with the first batch!"); + } + + { + println!("=== Producing the second batch of claims."); + multidapp_produce_claims( + &fixture, + &mut epochs, + &dapps, + &second_batch, + ) + .await; + println!("=== Epochs: {:?}", epochs); + + println!("--- Setting dapps..."); + fixture.dapps_set(second_batch_dapps.clone()).await; + assert_listen(&mut listener, &dapps, &second_batch).await; + let mut dapps = streams_to_vec(&listener.streams); + dapps.sort(); + println!("--- Current dapps: {:?}", dapps); + assert_eq_vec(second_batch_dapps, dapps); + println!("--- All good with the second batch!"); + } + + { + println!("=== Producing the third batch of claims."); + multidapp_produce_claims( + &fixture, + &mut epochs, + &dapps, + &third_batch, + ) + .await; + println!("=== Epochs: {:?}", epochs); + + println!("--- Setting dapps..."); + fixture.dapps_set(third_batch_dapps.clone()).await; + assert_listen(&mut listener, &dapps, &third_batch).await; + let mut dapps = streams_to_vec(&listener.streams); + dapps.sort(); + println!("--- Current dapps: {:?}", dapps); + assert_eq_vec(third_batch_dapps, dapps); + println!("--- All good with the third batch!"); + } + + { + println!("=== Producing the fourth batch of claims."); + multidapp_produce_claims( + &fixture, + &mut epochs, + &dapps, + &fourth_batch, + ) + .await; + println!("=== Epochs: {:?}", epochs); + + println!("--- Setting dapps..."); + fixture.dapps_set(fourth_batch_dapps.clone()).await; + assert_listen(&mut listener, &dapps, &fourth_batch).await; + let mut dapps = streams_to_vec(&listener.streams); + dapps.sort(); + println!("--- Current dapps: {:?}", dapps); + assert_eq_vec(fourth_batch_dapps, dapps); + println!("--- All good with the fourth batch!"); + } + } + + #[tokio::test] + async fn multidapp_listen_with_one_claim_enqueued() { + let docker = Cli::default(); + let (fixture, mut listener, dapps) = + setup_multidapp_listener(&docker, false).await.unwrap(); + + let mut epochs = vec![0; dapps.len()]; + let index = 0; + multidapp_produce_claims(&fixture, &mut epochs, &dapps, &vec![index]) + .await; + + let result = listener.listen().await; + assert!(result.is_ok(), "{:?}", result); + + let expected_dapp = dapps.get(index).unwrap().clone(); + let actual_dapp = result.unwrap().dapp_address; + assert_eq!(expected_dapp, actual_dapp); + } + + #[tokio::test] + async fn multidapp_listen_with_multiple_claims_enqueued() { + let docker = Cli::default(); + let (fixture, mut listener, dapps) = + setup_multidapp_listener(&docker, false).await.unwrap(); + + let mut epochs = vec![0; dapps.len()]; + let indexes = vec![2, 1, 1, 2]; + multidapp_produce_claims(&fixture, &mut epochs, &dapps, &indexes).await; + assert_listen(&mut listener, &dapps, &indexes).await; + } + + #[tokio::test] + async fn multidapp_listen_with_one_claim_for_each_dapp_enqueued() { + let docker = Cli::default(); + let (fixture, mut listener, dapps) = + setup_multidapp_listener(&docker, false).await.unwrap(); + + let mut epochs = vec![0; dapps.len()]; + let indexes = vec![2, 1, 0]; + multidapp_produce_claims(&fixture, &mut epochs, &dapps, &indexes).await; + assert_listen(&mut listener, &dapps, &indexes).await; + } + + #[tokio::test] + async fn multidapp_listen_with_no_claims_enqueued() { + let docker = Cli::default(); + let (fixture, mut listener, dapps) = + setup_multidapp_listener(&docker, false).await.unwrap(); + + let mut epochs = vec![0; dapps.len()]; + let first_batch = vec![0, 1, 2, 0]; + let second_batch = vec![2, 1, 0, 0, 2, 1]; + + let broker_listener_thread = { + let _dapps = dapps.clone(); + let _first_batch = first_batch.clone(); + let _second_batch = second_batch.clone(); + tokio::spawn(async move { + println!("Spawned the broker-listener thread."); + assert_listen(&mut listener, &_dapps, &_first_batch).await; + println!("All good with the first batch!"); + assert_listen(&mut listener, &_dapps, &_second_batch).await; + println!("All good with the second batch!"); + }) + }; + + println!("Going to sleep for 1 second."); + tokio::time::sleep(Duration::from_secs(1)).await; + + println!("Producing the first batch of claims."); + multidapp_produce_claims(&fixture, &mut epochs, &dapps, &first_batch) + .await; + println!("Epochs: {:?}", epochs); + + println!("Going to sleep for 2 seconds."); + tokio::time::sleep(Duration::from_secs(2)).await; + + println!("Producing the second batch of claims."); + multidapp_produce_claims(&fixture, &mut epochs, &dapps, &second_batch) + .await; + println!("Epochs: {:?}", epochs); broker_listener_thread.await.unwrap(); } + + #[tokio::test] + async fn multidapp_listen_buffer_order() { + let docker = Cli::default(); + let (fixture, mut listener, dapps) = + setup_multidapp_listener(&docker, false).await.unwrap(); + + let mut epochs = vec![0; dapps.len()]; + let indexes = vec![1, 1, 1, 1, 1, 2, 1, 2, 0]; + multidapp_produce_claims(&fixture, &mut epochs, &dapps, &indexes).await; + + let mut buffers = vec![ + vec![0, 1, 2], // + vec![1, 2], + vec![1], + vec![1], + vec![1], + vec![1], + ]; + + for buffer in buffers.iter_mut() { + for _ in 0..buffer.len() { + println!("Buffer: {:?}", buffer); + let result = listener.listen().await; + assert!(result.is_ok(), "{:?}", result.unwrap_err()); + let dapp_address = result.unwrap().dapp_address; + let index = dapps + .iter() + .position(|address| *address == dapp_address) + .unwrap(); + let index = buffer.iter().position(|i| *i == index).unwrap(); + buffer.remove(index); + } + assert!(buffer.is_empty()); + println!("Emptied one of the buffers"); + } + } + + #[tokio::test] + async fn multidapp_listen_buffer_change() { + let docker = Cli::default(); + let (fixture, mut listener, mut dapps) = + setup_multidapp_listener(&docker, false).await.unwrap(); + + let mut epochs = vec![0; dapps.len()]; + let indexes = vec![2, 2, 2, 0, 1, 0]; + multidapp_produce_claims(&fixture, &mut epochs, &dapps, &indexes).await; + + // Removes the last dapp. + assert!(dapps.pop().is_some()); + fixture.dapps_set(dapps.clone()).await; + + let mut buffers = vec![ + vec![0, 1], // + vec![0], + ]; + + for buffer in buffers.iter_mut() { + for _ in 0..buffer.len() { + println!("Buffer: {:?}", buffer); + let result = listener.listen().await; + assert!(result.is_ok(), "{:?}", result.unwrap_err()); + let dapp_address = result.unwrap().dapp_address; + let index = dapps + .iter() + .position(|address| *address == dapp_address) + .unwrap(); + let index = buffer.iter().position(|i| *i == index).unwrap(); + buffer.remove(index); + } + assert!(buffer.is_empty()); + println!("Emptied one of the buffers"); + } + } } diff --git a/offchain/rollups-events/Cargo.toml b/offchain/rollups-events/Cargo.toml index 7eda7413b..d784f8024 100644 --- a/offchain/rollups-events/Cargo.toml +++ b/offchain/rollups-events/Cargo.toml @@ -12,6 +12,7 @@ base64.workspace = true clap = { workspace = true, features = ["derive", "env"] } hex.workspace = true prometheus-client.workspace = true +regex.workspace = true serde = { workspace = true, features = ["derive"] } serde_json.workspace = true snafu.workspace = true diff --git a/offchain/rollups-events/src/broker/mod.rs b/offchain/rollups-events/src/broker/mod.rs index fa622a3d1..529d1d1ee 100644 --- a/offchain/rollups-events/src/broker/mod.rs +++ b/offchain/rollups-events/src/broker/mod.rs @@ -14,14 +14,20 @@ use redis::{ }; use serde::{de::DeserializeOwned, Serialize}; use snafu::{ResultExt, Snafu}; +use std::collections::HashMap; +use std::convert::identity; use std::fmt; +use std::str::FromStr; use std::time::Duration; pub use redacted::{RedactedUrl, Url}; +use crate::Address; + pub mod indexer; pub const INITIAL_ID: &str = "0"; +pub const DAPPS_KEY: &str = "experimental-dapp-addresses-config"; /// The `BrokerConnection` enum implements the `ConnectionLike` trait /// to satisfy the `AsyncCommands` trait bounds. @@ -280,6 +286,164 @@ impl Broker { Ok(None) } } + + #[tracing::instrument(level = "trace", skip_all)] + async fn _consume_blocking_from_multiple_streams( + &mut self, + streams: &Vec, + last_consumed_ids: &Vec, + ) -> Result)>, BrokerError> { + let reply = retry(self.backoff.clone(), || async { + let stream_keys: Vec = streams + .iter() + .map(|stream| stream.key().to_string()) + .collect(); + + let opts = StreamReadOptions::default() + .count(1) + .block(self.consume_timeout); + let reply: StreamReadReply = self + .connection + .clone() + .xread_options(&stream_keys, &last_consumed_ids, &opts) + .await?; + + Ok(reply) + }) + .await + .context(ConnectionSnafu)?; + + tracing::trace!("checking for timeout"); + if reply.keys.is_empty() { + return Err(BrokerError::ConsumeTimeout); + } + + tracing::trace!("getting the consumed events"); + let mut response: Vec<(S, Event)> = vec![]; + for mut stream_key in reply.keys { + tracing::trace!("parsing stream key {:?}", stream_key); + if let Some(event) = stream_key.ids.pop() { + tracing::trace!("parsing received event"); + let stream = S::from_key(stream_key.key); + let event = event.try_into()?; + response.push((stream, event)); + } + } + if response.is_empty() { + Err(BrokerError::FailedToConsume) + } else { + Ok(response) + } + } + + /// Consume the next event from one of the streams. + /// + /// This function blocks until a new event is available in one of the streams. + /// It timeouts with BrokerError::FailedToConsume. + /// + /// To consume the first event for a stream, `last_consumed_id[...]` should be `INITIAL_ID`. + #[tracing::instrument(level = "trace", skip_all)] + pub async fn consume_blocking_from_multiple_streams< + S: BrokerMultiStream, + >( + &mut self, + streams: HashMap, // streams to last-consumed-ids + ) -> Result)>, BrokerError> { + let (streams, last_consumed_ids): (Vec<_>, Vec<_>) = + streams.into_iter().map(identity).unzip(); + + let result = self + ._consume_blocking_from_multiple_streams( + &streams, + &last_consumed_ids, + ) + .await; + + if let Err(BrokerError::ConsumeTimeout) = result { + Err(BrokerError::FailedToConsume) + } else { + result + } + } + + #[tracing::instrument(level = "trace", skip_all)] + pub async fn _get_dapps(&mut self) -> Result, BrokerError> { + let reply = retry(self.backoff.clone(), || async { + tracing::trace!(key = DAPPS_KEY, "getting key"); + let reply: Vec = + self.connection.clone().smembers(DAPPS_KEY).await?; + + let mut dapp_addresses: Vec
= vec![]; + for value in reply { + let normalized = value.to_lowercase(); + let dapp_address = Address::from_str(&normalized).unwrap(); + if dapp_addresses.contains(&dapp_address) { + let _: () = + self.connection.clone().srem(DAPPS_KEY, value).await?; + } else { + dapp_addresses.push(dapp_address); + } + } + + Ok(dapp_addresses) + }) + .await + .context(ConnectionSnafu)?; + + if reply.is_empty() { + Err(BrokerError::ConsumeTimeout) + } else { + Ok(reply) + } + } + + /// Gets the dapp addresses. + pub async fn get_dapps(&mut self) -> Result, BrokerError> { + loop { + let result = self._get_dapps().await; + if let Err(BrokerError::ConsumeTimeout) = result { + tracing::trace!("consume timed out, retrying"); + } else { + return result; + } + } + } + + /// Sets the dapp addresses. + /// NOTE: this function is used strictly for testing. + #[tracing::instrument(level = "trace", skip_all)] + pub async fn dapps_set(&mut self, dapp_addresses: Vec
) { + tracing::trace!(key = DAPPS_KEY, "setting key"); + let _: () = self.connection.clone().del(DAPPS_KEY).await.unwrap(); + for dapp_address in dapp_addresses { + let _: () = self + .connection + .clone() + .sadd(DAPPS_KEY, dapp_address.to_string()) + .await + .unwrap(); + } + } + + /// Adds a dapp address (as a string). + /// NOTE: this function is used strictly for testing. + #[tracing::instrument(level = "trace", skip_all)] + pub async fn dapps_add(&mut self, dapp_address: String) { + tracing::trace!(dapp = dapp_address, "adding dapp"); + self.connection + .clone() + .sadd(DAPPS_KEY, dapp_address) + .await + .unwrap() + } + + /// Gets the dapp addresses as strings. + /// NOTE: this function is used strictly for testing. + #[tracing::instrument(level = "trace", skip_all)] + pub async fn dapps_members(&mut self) -> Vec { + tracing::trace!("getting dapps members"); + self.connection.clone().smembers(DAPPS_KEY).await.unwrap() + } } /// Custom implementation of Debug because ConnectionManager doesn't implement debug @@ -297,6 +461,10 @@ pub trait BrokerStream { fn key(&self) -> &str; } +pub trait BrokerMultiStream: BrokerStream { + fn from_key(key: String) -> Self; +} + /// Event that goes through the broker #[derive(Debug, Clone, Eq, PartialEq)] pub struct Event { diff --git a/offchain/rollups-events/src/common.rs b/offchain/rollups-events/src/common.rs index 1665896f7..31fa2ce19 100644 --- a/offchain/rollups-events/src/common.rs +++ b/offchain/rollups-events/src/common.rs @@ -6,6 +6,7 @@ use prometheus_client::encoding::EncodeLabelValue; use prometheus_client::encoding::LabelValueEncoder; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use std::fmt::Write; +use std::str::FromStr; pub const ADDRESS_SIZE: usize = 20; pub const HASH_SIZE: usize = 32; @@ -13,7 +14,7 @@ pub const HASH_SIZE: usize = 32; const PAYLOAD_DEBUG_MAX_LEN: usize = 100; /// A binary array that is converted to a hex string when serialized -#[derive(Clone, Hash, Eq, PartialEq)] +#[derive(Clone, Hash, Eq, PartialEq, Ord, PartialOrd)] pub struct HexArray([u8; N]); impl HexArray { @@ -40,6 +41,14 @@ impl From<[u8; N]> for HexArray { } } +impl FromStr for HexArray { + type Err = serde_json::Error; + + fn from_str(s: &str) -> Result { + serde_json::from_value(serde_json::Value::String(s.to_string())) + } +} + impl Serialize for HexArray { fn serialize(&self, serializer: S) -> Result where @@ -69,6 +78,16 @@ impl<'de, const N: usize> Deserialize<'de> for HexArray { } } +impl ToString for HexArray { + fn to_string(&self) -> String { + let s = serde_json::to_string(self).unwrap(); + let mut chars = s.chars(); + chars.next(); + chars.next_back(); + chars.as_str().to_string() + } +} + impl std::fmt::Debug for HexArray { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", hex::encode(self.inner())) diff --git a/offchain/rollups-events/src/lib.rs b/offchain/rollups-events/src/lib.rs index 8c75b8ff8..b660c7bdd 100644 --- a/offchain/rollups-events/src/lib.rs +++ b/offchain/rollups-events/src/lib.rs @@ -10,7 +10,8 @@ mod rollups_stream; pub use broker::{ indexer, Broker, BrokerCLIConfig, BrokerConfig, BrokerEndpoint, - BrokerError, BrokerStream, Event, RedactedUrl, Url, INITIAL_ID, + BrokerError, BrokerMultiStream, BrokerStream, Event, RedactedUrl, Url, + DAPPS_KEY, INITIAL_ID, }; pub use common::{Address, Hash, Payload, ADDRESS_SIZE, HASH_SIZE}; pub use rollups_claims::{RollupsClaim, RollupsClaimsStream}; @@ -23,4 +24,6 @@ pub use rollups_outputs::{ RollupsOutput, RollupsOutputEnum, RollupsOutputValidityProof, RollupsOutputsStream, RollupsProof, RollupsReport, RollupsVoucher, }; -pub use rollups_stream::{DAppMetadata, DAppMetadataCLIConfig}; +pub use rollups_stream::{ + parse_stream_with_key, DAppMetadata, DAppMetadataCLIConfig, +}; diff --git a/offchain/rollups-events/src/rollups_claims.rs b/offchain/rollups-events/src/rollups_claims.rs index 69cb2a486..93b9a856a 100644 --- a/offchain/rollups-events/src/rollups_claims.rs +++ b/offchain/rollups-events/src/rollups_claims.rs @@ -3,28 +3,9 @@ use serde::{Deserialize, Serialize}; -use crate::{Address, BrokerStream, Hash}; +use crate::{rollups_stream::decl_broker_stream, Address, Hash}; -#[derive(Debug)] -pub struct RollupsClaimsStream { - key: String, -} - -impl BrokerStream for RollupsClaimsStream { - type Payload = RollupsClaim; - - fn key(&self) -> &str { - &self.key - } -} - -impl RollupsClaimsStream { - pub fn new(chain_id: u64) -> Self { - Self { - key: format!("{{chain-{}}}:rollups-claims", chain_id), - } - } -} +decl_broker_stream!(RollupsClaimsStream, RollupsClaim, "rollups-claims"); /// Event generated when the Cartesi Rollups epoch finishes #[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize)] diff --git a/offchain/rollups-events/src/rollups_outputs.rs b/offchain/rollups-events/src/rollups_outputs.rs index 57c8bc0c3..a485391e7 100644 --- a/offchain/rollups-events/src/rollups_outputs.rs +++ b/offchain/rollups-events/src/rollups_outputs.rs @@ -9,7 +9,7 @@ use crate::{rollups_stream::decl_broker_stream, Address, Hash, Payload}; decl_broker_stream!(RollupsOutputsStream, RollupsOutput, "rollups-outputs"); -/// Cartesi output +/// Cartesi output #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] pub enum RollupsOutput { AdvanceResult(RollupsAdvanceResult), diff --git a/offchain/rollups-events/src/rollups_stream.rs b/offchain/rollups-events/src/rollups_stream.rs index 7dd677fd3..7d9eb9d97 100644 --- a/offchain/rollups-events/src/rollups_stream.rs +++ b/offchain/rollups-events/src/rollups_stream.rs @@ -63,15 +63,38 @@ impl From for DAppMetadata { } } +pub fn parse_stream_with_key(key: String, inner_key: &str) -> (u64, Address) { + let mut re = r"^\{chain-([^:]+):dapp-([^}]+)\}:".to_string(); + re.push_str(inner_key); + re.push_str("$"); + let re = regex::Regex::new(&re).unwrap(); + let caps = re.captures(&key).unwrap(); + + let chain_id = caps + .get(1) + .unwrap() + .as_str() + .to_string() + .parse::() + .unwrap(); + let address = caps.get(2).unwrap().as_str().to_string(); + let address = + serde_json::from_value(serde_json::Value::String(address)).unwrap(); + + return (chain_id, address); +} + /// Declares a struct that implements the BrokerStream interface /// The generated key has the format `{chain-:dapp-}:`. /// The curly braces define a hash tag to ensure that all of a dapp's streams /// are located in the same node when connected to a Redis cluster. macro_rules! decl_broker_stream { ($stream: ident, $payload: ty, $key: literal) => { - #[derive(Debug)] + #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct $stream { key: String, + pub chain_id: u64, + pub dapp_address: Address, } impl crate::broker::BrokerStream for $stream { @@ -82,15 +105,31 @@ macro_rules! decl_broker_stream { } } + impl crate::broker::BrokerMultiStream for $stream { + fn from_key(key: String) -> Self { + let (chain_id, dapp_address) = + crate::parse_stream_with_key(key.clone(), $key); + Self { + key: key, + chain_id: chain_id, + dapp_address: dapp_address, + } + } + } + impl $stream { pub fn new(metadata: &crate::rollups_stream::DAppMetadata) -> Self { + let chain_id = metadata.chain_id; + let dapp_address = metadata.dapp_address.clone(); Self { key: format!( "{{chain-{}:dapp-{}}}:{}", - metadata.chain_id, - hex::encode(metadata.dapp_address.inner()), + chain_id, + hex::encode(dapp_address.inner()), $key ), + chain_id: chain_id, + dapp_address: dapp_address, } } } @@ -102,7 +141,7 @@ pub(crate) use decl_broker_stream; #[cfg(test)] mod tests { use super::*; - use crate::ADDRESS_SIZE; + use crate::{broker::BrokerMultiStream, BrokerStream, ADDRESS_SIZE}; use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] @@ -119,4 +158,21 @@ mod tests { let stream = MockStream::new(&metadata); assert_eq!(stream.key, "{chain-123:dapp-fafafafafafafafafafafafafafafafafafafafa}:rollups-mock"); } + + #[test] + fn it_parses_the_key() { + let metadata = DAppMetadata { + chain_id: 123, + dapp_address: Address::new([0xfe; ADDRESS_SIZE]), + }; + + let stream = MockStream::new(&metadata); + let expected = "{chain-123:dapp-fefefefefefefefefefefefefefefefefefefefe}:rollups-mock"; + let key = stream.key().to_string(); + assert_eq!(expected, &key); + + let stream = MockStream::from_key(key); + assert_eq!(metadata.chain_id, stream.chain_id); + assert_eq!(metadata.dapp_address, stream.dapp_address); + } } diff --git a/offchain/rollups-events/tests/integration.rs b/offchain/rollups-events/tests/integration.rs index 6bfaa9cc8..daf67e905 100644 --- a/offchain/rollups-events/tests/integration.rs +++ b/offchain/rollups-events/tests/integration.rs @@ -1,6 +1,8 @@ // (c) Cartesi and individual authors (see AUTHORS) // SPDX-License-Identifier: Apache-2.0 (see LICENSE) +use std::collections::HashMap; + use backoff::ExponentialBackoff; use redis::aio::ConnectionManager; use redis::streams::StreamRangeReply; @@ -11,8 +13,8 @@ use testcontainers::{ }; use rollups_events::{ - Broker, BrokerConfig, BrokerEndpoint, BrokerError, BrokerStream, - RedactedUrl, Url, INITIAL_ID, + Broker, BrokerConfig, BrokerEndpoint, BrokerError, BrokerMultiStream, + BrokerStream, RedactedUrl, Url, INITIAL_ID, }; const STREAM_KEY: &'static str = "test-stream"; @@ -286,3 +288,133 @@ async fn test_it_does_not_block_when_consuming_empty_stream() { .expect("failed to peek"); assert!(matches!(event, None)); } + +// ------------------------------------------------------------------------------------------------ + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +struct AnotherMockStream { + key: String, + a: u8, + b: u8, +} + +impl AnotherMockStream { + fn new(a: u8, b: u8) -> Self { + let key = format!("{{a-{}:b-{}}}:{}", a, b, STREAM_KEY); + Self { key, a, b } + } +} + +impl BrokerStream for AnotherMockStream { + type Payload = MockPayload; + + fn key(&self) -> &str { + &self.key + } +} + +impl BrokerMultiStream for AnotherMockStream { + fn from_key(key: String) -> Self { + let re = r"^\{a-([^:]+):b-([^}]+)\}:test-stream$".to_string(); + let re = regex::Regex::new(&re).unwrap(); + let caps = re.captures(&key).unwrap(); + + let a = caps + .get(1) + .unwrap() + .as_str() + .to_string() + .parse::() + .unwrap(); + let b = caps + .get(2) + .unwrap() + .as_str() + .to_string() + .parse::() + .unwrap(); + + Self { key, a, b } + } +} + +#[test_log::test(tokio::test)] +async fn test_it_consumes_from_multiple_streams() { + let docker = Cli::default(); + let state = TestState::setup(&docker).await; + let mut broker = state.create_broker().await; + + // Creates the map of streams to last-consumed-ids. + let mut streams = HashMap::new(); + let initial_id = INITIAL_ID.to_string(); + streams.insert(AnotherMockStream::new(1, 2), initial_id.clone()); + streams.insert(AnotherMockStream::new(3, 4), initial_id.clone()); + streams.insert(AnotherMockStream::new(5, 6), initial_id.clone()); + + // Produces N events for each stream using the broker struct. + const N: usize = 3; + for stream in streams.keys() { + for i in 0..N { + let data = format!("{}{}{}", stream.a, stream.b, i); + let payload = MockPayload { data }; + let _ = broker + .produce(stream, payload) + .await + .expect("failed to produce events"); + } + } + + // Consumes all events using the broker struct. + let mut counters = HashMap::new(); + for _ in 0..streams.len() { + let streams_and_events = broker + .consume_blocking_from_multiple_streams(streams.clone()) + .await + .expect("failed to consume"); + + for (stream, event) in streams_and_events { + let i = counters + .entry(stream.clone()) + .and_modify(|n| *n += 1) + .or_insert(0) + .clone(); + + // Asserts that the payload is correct. + let expected = format!("{}{}{}", stream.a, stream.b, i); + assert_eq!(expected, event.payload.data); + + // Updates the map of streams with the last consumed id. + let replaced = streams.insert(stream, event.id); + // And asserts that the key from the map was indeed overwritten. + assert!(replaced.is_some()); + } + } + + // Asserts that N events were consumed from each stream. + for counter in counters.values() { + assert_eq!(N - 1, *counter); + } + + // Gets one of the streams. + let stream = streams.clone().into_keys().next().unwrap(); + let expected_stream = stream.clone(); + + // Produces the final event. + let data = "final event".to_string(); + let payload = MockPayload { data }; + let _ = broker + .produce(&stream, payload) + .await + .expect("failed to produce the final event"); + + // Consumes the final event. + let mut streams_and_events = broker + .consume_blocking_from_multiple_streams(streams) + .await + .expect("failed to consume the final event"); + assert_eq!(1, streams_and_events.len()); + let (final_stream, _) = streams_and_events.pop().unwrap(); + + // Asserts that the event came from the correct stream. + assert_eq!(expected_stream, final_stream); +} diff --git a/offchain/test-fixtures/src/broker.rs b/offchain/test-fixtures/src/broker.rs index 95b9ce19b..a384b757f 100644 --- a/offchain/test-fixtures/src/broker.rs +++ b/offchain/test-fixtures/src/broker.rs @@ -1,6 +1,8 @@ // (c) Cartesi and individual authors (see AUTHORS) // SPDX-License-Identifier: Apache-2.0 (see LICENSE) +use std::collections::HashMap; + use backoff::ExponentialBackoff; use rollups_events::{ Address, Broker, BrokerConfig, BrokerEndpoint, DAppMetadata, Event, @@ -17,6 +19,40 @@ const CHAIN_ID: u64 = 0; const DAPP_ADDRESS: Address = Address::new([0xfa; ADDRESS_SIZE]); const CONSUME_TIMEOUT: usize = 10_000; // ms +async fn start_redis( + docker: &Cli, +) -> (Container, BrokerEndpoint, Mutex) { + tracing::trace!("starting redis docker container"); + let image = GenericImage::new("redis", "6.2").with_wait_for( + WaitFor::message_on_stdout("Ready to accept connections"), + ); + let node = docker.run(image); + let port = node.get_host_port_ipv4(6379); + let endpoint = BrokerEndpoint::Single( + Url::parse(&format!("redis://127.0.0.1:{}", port)) + .map(RedactedUrl::new) + .expect("failed to parse Redis Url"), + ); + + let backoff = ExponentialBackoff::default(); + let config = BrokerConfig { + redis_endpoint: endpoint.clone(), + consume_timeout: CONSUME_TIMEOUT, + backoff, + }; + + tracing::trace!(?endpoint, "connecting to redis with rollups_events crate"); + let client = Mutex::new( + Broker::new(config) + .await + .expect("failed to connect to broker"), + ); + + (node, endpoint, client) +} + +// ------------------------------------------------------------------------------------------------ + pub struct BrokerFixture<'d> { _node: Container<'d, GenericImage>, client: Mutex, @@ -33,51 +69,23 @@ impl BrokerFixture<'_> { pub async fn setup(docker: &Cli) -> BrokerFixture<'_> { tracing::info!("setting up redis fixture"); - tracing::trace!("starting redis docker container"); - let image = GenericImage::new("redis", "6.2").with_wait_for( - WaitFor::message_on_stdout("Ready to accept connections"), - ); - let node = docker.run(image); - let port = node.get_host_port_ipv4(6379); - let redis_endpoint = BrokerEndpoint::Single( - Url::parse(&format!("redis://127.0.0.1:{}", port)) - .map(RedactedUrl::new) - .expect("failed to parse Redis Url"), - ); - let chain_id = CHAIN_ID; - let dapp_address = DAPP_ADDRESS; - let backoff = ExponentialBackoff::default(); + let (redis_node, redis_endpoint, redis_client) = + start_redis(&docker).await; + let metadata = DAppMetadata { - chain_id, - dapp_address: dapp_address.clone(), - }; - let inputs_stream = RollupsInputsStream::new(&metadata); - let claims_stream = RollupsClaimsStream::new(metadata.chain_id); - let outputs_stream = RollupsOutputsStream::new(&metadata); - let config = BrokerConfig { - redis_endpoint: redis_endpoint.clone(), - consume_timeout: CONSUME_TIMEOUT, - backoff, + chain_id: CHAIN_ID, + dapp_address: DAPP_ADDRESS.clone(), }; - tracing::trace!( - ?redis_endpoint, - "connecting to redis with rollups_events crate" - ); - let client = Mutex::new( - Broker::new(config) - .await - .expect("failed to connect to broker"), - ); BrokerFixture { - _node: node, - client, - inputs_stream, - claims_stream, - outputs_stream, + _node: redis_node, + client: redis_client, + inputs_stream: RollupsInputsStream::new(&metadata), + claims_stream: RollupsClaimsStream::new(&metadata), + outputs_stream: RollupsOutputsStream::new(&metadata), redis_endpoint, - chain_id, - dapp_address, + chain_id: CHAIN_ID, + dapp_address: DAPP_ADDRESS, } } @@ -246,3 +254,101 @@ impl BrokerFixture<'_> { .expect("failed to produce output"); } } + +// ------------------------------------------------------------------------------------------------ + +pub struct ClaimerMultidappBrokerFixture<'d> { + _node: Container<'d, GenericImage>, + client: Mutex, + redis_endpoint: BrokerEndpoint, + claims_streams: HashMap, +} + +impl ClaimerMultidappBrokerFixture<'_> { + #[tracing::instrument(level = "trace", skip_all)] + pub async fn setup( + docker: &Cli, + dapps: Vec<(u64, Address)>, + ) -> ClaimerMultidappBrokerFixture<'_> { + let (redis_node, redis_endpoint, redis_client) = + start_redis(&docker).await; + + let claims_streams = dapps + .into_iter() + .map(|(chain_id, dapp_address)| { + let dapp_metadata = DAppMetadata { + chain_id, + dapp_address: dapp_address.clone(), + }; + let stream = RollupsClaimsStream::new(&dapp_metadata); + (dapp_address, stream) + }) + .collect::>(); + let claims_streams = HashMap::from_iter(claims_streams); + + ClaimerMultidappBrokerFixture { + _node: redis_node, + client: redis_client, + redis_endpoint, + claims_streams, + } + } + + pub fn redis_endpoint(&self) -> &BrokerEndpoint { + &self.redis_endpoint + } + + #[tracing::instrument(level = "trace", skip_all)] + pub async fn dapps_set(&self, dapps: Vec
) { + self.client.lock().await.dapps_set(dapps).await + } + + #[tracing::instrument(level = "trace", skip_all)] + pub async fn dapps_add(&self, dapp: String) { + self.client.lock().await.dapps_add(dapp).await + } + + #[tracing::instrument(level = "trace", skip_all)] + pub async fn dapps_members(&self) -> Vec { + self.client.lock().await.dapps_members().await + } + + // Different from the default function, + // this one requires `rollups_claim.dapp_address` to be set, + // and to match one of the addresses from the streams. + #[tracing::instrument(level = "trace", skip_all)] + pub async fn produce_rollups_claim(&self, rollups_claim: RollupsClaim) { + tracing::trace!(?rollups_claim.epoch_hash, "producing rollups-claim event"); + + let stream = self + .claims_streams + .get(&rollups_claim.dapp_address) + .unwrap() + .clone(); + + { + let last_claim = self + .client + .lock() + .await + .peek_latest(&stream) + .await + .expect("failed to get latest claim"); + let epoch_index = match last_claim { + Some(event) => event.payload.epoch_index + 1, + None => 0, + }; + assert_eq!( + rollups_claim.epoch_index, epoch_index, + "invalid epoch index", + ); + } + + self.client + .lock() + .await + .produce(&stream, rollups_claim) + .await + .expect("failed to produce claim"); + } +}