diff --git a/offchain/Cargo.lock b/offchain/Cargo.lock index 65f3decd8..9f990e30f 100644 --- a/offchain/Cargo.lock +++ b/offchain/Cargo.lock @@ -4451,6 +4451,7 @@ dependencies = [ "prometheus-client", "redacted", "redis", + "regex", "serde", "serde_json", "snafu 0.8.2", diff --git a/offchain/authority-claimer/src/claimer.rs b/offchain/authority-claimer/src/claimer.rs index 43c475d20..6af3c5902 100644 --- a/offchain/authority-claimer/src/claimer.rs +++ b/offchain/authority-claimer/src/claimer.rs @@ -3,8 +3,10 @@ use async_trait::async_trait; use snafu::ResultExt; -use std::fmt::Debug; -use tracing::{info, trace}; +use std::{collections::HashMap, fmt::Debug}; +use tracing::{debug, info}; + +use rollups_events::Address; use crate::{ checker::DuplicateChecker, listener::BrokerListener, @@ -31,6 +33,9 @@ pub enum ClaimerError< D: DuplicateChecker, T: TransactionSender, > { + #[snafu(display("invalid app address {:?}", app_address))] + InvalidAppAddress { app_address: Address }, + #[snafu(display("broker listener error"))] BrokerListenerError { source: B::Error }, @@ -45,8 +50,8 @@ pub enum ClaimerError< // DefaultClaimer // ------------------------------------------------------------------------------------------------ -/// The `DefaultClaimer` must be injected with a -/// `BrokerListener`, a `DuplicateChecker` and a `TransactionSender`. +/// The `DefaultClaimer` must be injected with a `BrokerListener`, a map of `Address` to +/// `DuplicateChecker`, and a `TransactionSender`. #[derive(Debug)] pub struct DefaultClaimer< B: BrokerListener, @@ -54,7 +59,7 @@ pub struct DefaultClaimer< T: TransactionSender, > { broker_listener: B, - duplicate_checker: D, + duplicate_checkers: HashMap, transaction_sender: T, } @@ -63,12 +68,12 @@ impl { pub fn new( broker_listener: B, - duplicate_checker: D, + duplicate_checkers: HashMap, transaction_sender: T, ) -> Self { Self { broker_listener, - duplicate_checker, + duplicate_checkers, transaction_sender, } } @@ -84,25 +89,37 @@ where type Error = ClaimerError; async fn start(mut self) -> Result<(), Self::Error> { - trace!("Starting the authority claimer loop"); + debug!("Starting the authority claimer loop"); loop { - let rollups_claim = self + // Listens for claims. + let (app_address, rollups_claim) = self .broker_listener .listen() .await .context(BrokerListenerSnafu)?; - trace!("Got a claim from the broker: {:?}", rollups_claim); + debug!("Got a claim from the broker: {:?}", rollups_claim); - let is_duplicated_rollups_claim = self - .duplicate_checker + // Gets the duplicate checker. + let duplicate_checker = self + .duplicate_checkers + .get_mut(&app_address) + .ok_or(ClaimerError::InvalidAppAddress { + app_address: app_address.clone(), + })?; + + // Checks for duplicates. + let is_duplicated_rollups_claim = duplicate_checker .is_duplicated_rollups_claim(&rollups_claim) .await .context(DuplicatedClaimSnafu)?; + + // If it is a duplicate, the loop continues. if is_duplicated_rollups_claim { - trace!("It was a duplicated claim"); + debug!("It was a duplicated claim"); continue; } + // Sends the claim. info!("Sending a new rollups claim"); self.transaction_sender = self .transaction_sender diff --git a/offchain/authority-claimer/src/lib.rs b/offchain/authority-claimer/src/lib.rs index 944efb4b4..d29b411de 100644 --- a/offchain/authority-claimer/src/lib.rs +++ b/offchain/authority-claimer/src/lib.rs @@ -9,6 +9,8 @@ pub mod metrics; pub mod sender; pub mod signer; +use std::collections::HashMap; + use config::Config; use snafu::Error; use tracing::trace; @@ -53,9 +55,12 @@ pub async fn run(config: Config) -> Result<(), Box> { .await?; // Creating the claimer loop. + let mut duplicate_checkers = HashMap::new(); + duplicate_checkers + .insert(rollups_events::Address::default(), duplicate_checker); // TODO let claimer = DefaultClaimer::new( broker_listener, - duplicate_checker, + duplicate_checkers, transaction_sender, ); let claimer_handle = claimer.start(); diff --git a/offchain/authority-claimer/src/listener.rs b/offchain/authority-claimer/src/listener.rs index 3f50a1cb1..c1f827ecc 100644 --- a/offchain/authority-claimer/src/listener.rs +++ b/offchain/authority-claimer/src/listener.rs @@ -3,19 +3,18 @@ use async_trait::async_trait; use rollups_events::{ - Broker, BrokerConfig, BrokerError, RollupsClaim, RollupsClaimsStream, - INITIAL_ID, + Address, Broker, BrokerConfig, BrokerError, RollupsClaim, + RollupsClaimsStream, INITIAL_ID, }; use snafu::ResultExt; -use std::fmt::Debug; +use std::{collections::HashMap, fmt::Debug}; -/// The `BrokerListener` listens for new claims from the broker +/// The `BrokerListener` listens for new claims from the broker. #[async_trait] pub trait BrokerListener: Debug { type Error: snafu::Error + 'static; - /// Listen to claims - async fn listen(&mut self) -> Result; + async fn listen(&mut self) -> Result<(Address, RollupsClaim), Self::Error>; } // ------------------------------------------------------------------------------------------------ @@ -25,8 +24,9 @@ pub trait BrokerListener: Debug { #[derive(Debug)] pub struct DefaultBrokerListener { broker: Broker, - stream: RollupsClaimsStream, - last_claim_id: String, + + // (chain_id, address) => last_claim_id + last_claim_ids: HashMap<(u64, Address), String>, } #[derive(Debug, snafu::Snafu)] @@ -38,16 +38,23 @@ pub enum BrokerListenerError { impl DefaultBrokerListener { pub async fn new( broker_config: BrokerConfig, - chain_id: u64, + streams: Vec<(u64, Address)>, // Vec<(chain_id, app_address)> ) -> Result { tracing::trace!("Connecting to the broker ({:?})", broker_config); let broker = Broker::new(broker_config).await?; - let stream = RollupsClaimsStream::new(chain_id); - let last_claim_id = INITIAL_ID.to_string(); + + // (address, chain_id) => last_claim_id + let mut last_claim_ids: HashMap<(u64, Address), String> = + HashMap::new(); + let initial_id = INITIAL_ID.to_string(); + for (chain_id, app_address) in streams { + last_claim_ids + .insert((chain_id, app_address.clone()), initial_id.clone()); + } + Ok(Self { broker, - stream, - last_claim_id, + last_claim_ids, }) } } @@ -56,17 +63,35 @@ impl DefaultBrokerListener { impl BrokerListener for DefaultBrokerListener { type Error = BrokerListenerError; - async fn listen(&mut self) -> Result { - tracing::trace!("Waiting for claim with id {}", self.last_claim_id); - let event = self + async fn listen(&mut self) -> Result<(Address, RollupsClaim), Self::Error> { + // TODO + // tracing::trace!("Waiting for claim with id {}", self.last_claim_id); + + let mut streams: Vec = vec![]; + let mut last_ids: Vec = vec![]; + for ((chain_id, app_address), last_id) in self.last_claim_ids.iter() { + let chain_id = *chain_id; + let app_address = app_address.clone(); + streams.push(RollupsClaimsStream::new(chain_id, app_address)); + last_ids.push(last_id.clone()); + } + + let (key, event) = self .broker - .consume_blocking(&self.stream, &self.last_claim_id) + .consume_blocking_from_multiple_streams(streams, last_ids) .await .context(BrokerSnafu)?; - self.last_claim_id = event.id; + let (chain_id, app_address): (u64, Address) = + RollupsClaimsStream::parse_key(key); + + let replaced = self + .last_claim_ids + .insert((chain_id, app_address.clone()), event.id); + assert!(replaced.is_some()); - Ok(event.payload) + let rollups_claim = event.payload; + Ok((app_address, rollups_claim)) } } diff --git a/offchain/rollups-events/Cargo.toml b/offchain/rollups-events/Cargo.toml index 7eda7413b..d784f8024 100644 --- a/offchain/rollups-events/Cargo.toml +++ b/offchain/rollups-events/Cargo.toml @@ -12,6 +12,7 @@ base64.workspace = true clap = { workspace = true, features = ["derive", "env"] } hex.workspace = true prometheus-client.workspace = true +regex.workspace = true serde = { workspace = true, features = ["derive"] } serde_json.workspace = true snafu.workspace = true diff --git a/offchain/rollups-events/src/broker/mod.rs b/offchain/rollups-events/src/broker/mod.rs index fa622a3d1..507c271b7 100644 --- a/offchain/rollups-events/src/broker/mod.rs +++ b/offchain/rollups-events/src/broker/mod.rs @@ -242,7 +242,6 @@ impl Broker { } } } - /// Consume the next event in stream without blocking /// This function returns None if there are no more remaining events. /// To consume the first event in the stream, `last_consumed_id` should be `INITIAL_ID`. @@ -280,6 +279,74 @@ impl Broker { Ok(None) } } + + // TODO + #[tracing::instrument(level = "trace", skip_all)] + async fn _consume_blocking_from_multiple_streams( + &mut self, + streams: &Vec, + last_consumed_ids: &Vec, + ) -> Result<(S, Event), BrokerError> { + let reply = retry(self.backoff.clone(), || async { + let stream_keys: Vec = streams + .iter() + .map(|stream| stream.key().to_string()) + .collect(); + + let opts = StreamReadOptions::default() + .count(1) + .block(self.consume_timeout); + let reply: StreamReadReply = self + .connection + .clone() + .xread_options(&stream_keys, &last_consumed_ids, &opts) + .await?; + + Ok(reply) + }) + .await + .context(ConnectionSnafu)?; + + tracing::trace!("checking for timeout"); + if reply.keys.is_empty() { + return Err(BrokerError::ConsumeTimeout); + } + + tracing::trace!("checking if any events were received"); + for mut stream_key in reply.keys { + if let Some(event) = stream_key.ids.pop() { + tracing::trace!("parsing received event"); + let stream = S::from_key(stream_key.key); + let event = event.try_into()?; + return Ok((stream, event)); + } + } + return Err(BrokerError::FailedToConsume); + } + + // TODO + #[tracing::instrument(level = "trace", skip_all)] + pub async fn consume_blocking_from_multiple_streams( + &mut self, + streams: Vec, + last_consumed_ids: Vec, + ) -> Result<(S, Event), BrokerError> { + assert!(streams.len() == last_consumed_ids.len()); + loop { + let result = self + ._consume_blocking_from_multiple_streams( + &streams, + &last_consumed_ids, + ) + .await; + + if let Err(BrokerError::ConsumeTimeout) = result { + tracing::trace!("consume timed out, retrying"); + } else { + return result; + } + } + } } /// Custom implementation of Debug because ConnectionManager doesn't implement debug @@ -295,6 +362,7 @@ impl fmt::Debug for Broker { pub trait BrokerStream { type Payload: Serialize + DeserializeOwned + Clone + Eq + PartialEq; fn key(&self) -> &str; + fn from_key(key: String) -> Self; } /// Event that goes through the broker diff --git a/offchain/rollups-events/src/lib.rs b/offchain/rollups-events/src/lib.rs index 8c75b8ff8..fa05140dd 100644 --- a/offchain/rollups-events/src/lib.rs +++ b/offchain/rollups-events/src/lib.rs @@ -23,4 +23,6 @@ pub use rollups_outputs::{ RollupsOutput, RollupsOutputEnum, RollupsOutputValidityProof, RollupsOutputsStream, RollupsProof, RollupsReport, RollupsVoucher, }; -pub use rollups_stream::{DAppMetadata, DAppMetadataCLIConfig}; +pub use rollups_stream::{ + parse_stream_with_key, DAppMetadata, DAppMetadataCLIConfig, +}; diff --git a/offchain/rollups-events/src/rollups_claims.rs b/offchain/rollups-events/src/rollups_claims.rs index 69cb2a486..a08b10cac 100644 --- a/offchain/rollups-events/src/rollups_claims.rs +++ b/offchain/rollups-events/src/rollups_claims.rs @@ -3,28 +3,9 @@ use serde::{Deserialize, Serialize}; -use crate::{Address, BrokerStream, Hash}; +use crate::{rollups_stream::decl_broker_stream, Address, Hash}; -#[derive(Debug)] -pub struct RollupsClaimsStream { - key: String, -} - -impl BrokerStream for RollupsClaimsStream { - type Payload = RollupsClaim; - - fn key(&self) -> &str { - &self.key - } -} - -impl RollupsClaimsStream { - pub fn new(chain_id: u64) -> Self { - Self { - key: format!("{{chain-{}}}:rollups-claims", chain_id), - } - } -} +decl_broker_stream!(RollupsClaimsStream, RollupsClaim, "rollups-claim"); /// Event generated when the Cartesi Rollups epoch finishes #[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize)] diff --git a/offchain/rollups-events/src/rollups_outputs.rs b/offchain/rollups-events/src/rollups_outputs.rs index 57c8bc0c3..a485391e7 100644 --- a/offchain/rollups-events/src/rollups_outputs.rs +++ b/offchain/rollups-events/src/rollups_outputs.rs @@ -9,7 +9,7 @@ use crate::{rollups_stream::decl_broker_stream, Address, Hash, Payload}; decl_broker_stream!(RollupsOutputsStream, RollupsOutput, "rollups-outputs"); -/// Cartesi output +/// Cartesi output #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] pub enum RollupsOutput { AdvanceResult(RollupsAdvanceResult), diff --git a/offchain/rollups-events/src/rollups_stream.rs b/offchain/rollups-events/src/rollups_stream.rs index 7dd677fd3..c94fd46a0 100644 --- a/offchain/rollups-events/src/rollups_stream.rs +++ b/offchain/rollups-events/src/rollups_stream.rs @@ -63,6 +63,27 @@ impl From for DAppMetadata { } } +pub fn parse_stream_with_key(key: String, inner_key: &str) -> (u64, Address) { + let mut re = r"^\{chain-([^:]+):dapp-([^}]+)\}:".to_string(); + re.push_str(inner_key); + re.push_str("$"); + let re = regex::Regex::new(&re).unwrap(); + let caps = re.captures(&key).unwrap(); + + let chain_id = caps + .get(1) + .unwrap() + .as_str() + .to_string() + .parse::() + .unwrap(); + let address = caps.get(2).unwrap().as_str().to_string(); + let address = + serde_json::from_value(serde_json::Value::String(address)).unwrap(); + + return (chain_id, address); +} + /// Declares a struct that implements the BrokerStream interface /// The generated key has the format `{chain-:dapp-}:`. /// The curly braces define a hash tag to ensure that all of a dapp's streams @@ -72,6 +93,8 @@ macro_rules! decl_broker_stream { #[derive(Debug)] pub struct $stream { key: String, + pub chain_id: u64, + pub dapp_address: Address, } impl crate::broker::BrokerStream for $stream { @@ -80,17 +103,31 @@ macro_rules! decl_broker_stream { fn key(&self) -> &str { &self.key } + + fn from_key(key: String) -> Self { + let (chain_id, dapp_address) = + crate::parse_stream_with_key(key.clone(), $key); + Self { + key: key, + chain_id: chain_id, + dapp_address: dapp_address, + } + } } impl $stream { pub fn new(metadata: &crate::rollups_stream::DAppMetadata) -> Self { + let chain_id = metadata.chain_id; + let dapp_address = metadata.dapp_address.clone(); Self { key: format!( "{{chain-{}:dapp-{}}}:{}", - metadata.chain_id, - hex::encode(metadata.dapp_address.inner()), + chain_id, + hex::encode(dapp_address.inner()), $key ), + chain_id: chain_id, + dapp_address: dapp_address, } } } @@ -102,7 +139,7 @@ pub(crate) use decl_broker_stream; #[cfg(test)] mod tests { use super::*; - use crate::ADDRESS_SIZE; + use crate::{BrokerStream, ADDRESS_SIZE}; use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] @@ -119,4 +156,21 @@ mod tests { let stream = MockStream::new(&metadata); assert_eq!(stream.key, "{chain-123:dapp-fafafafafafafafafafafafafafafafafafafafa}:rollups-mock"); } + + #[test] + fn it_parses_the_key() { + let metadata = DAppMetadata { + chain_id: 123, + dapp_address: Address::new([0xfe; ADDRESS_SIZE]), + }; + + let stream = MockStream::new(&metadata); + let expected = "{chain-123:dapp-fefefefefefefefefefefefefefefefefefefefe}:rollups-mock"; + let key = stream.key().to_string(); + assert_eq!(expected, &key); + + let stream = MockStream::from_key(key); + assert_eq!(metadata.chain_id, stream.chain_id); + assert_eq!(metadata.dapp_address, stream.dapp_address); + } } diff --git a/offchain/rollups-events/tests/integration.rs b/offchain/rollups-events/tests/integration.rs index 6bfaa9cc8..5ba89726b 100644 --- a/offchain/rollups-events/tests/integration.rs +++ b/offchain/rollups-events/tests/integration.rs @@ -76,6 +76,10 @@ impl BrokerStream for MockStream { fn key(&self) -> &str { STREAM_KEY } + + fn from_key(_: String) -> Self { + unimplemented!() + } } #[test_log::test(tokio::test)] @@ -286,3 +290,113 @@ async fn test_it_does_not_block_when_consuming_empty_stream() { .expect("failed to peek"); assert!(matches!(event, None)); } + +// ------------------------------------------------------------------------------------------------ + +#[derive(Debug, Clone)] +struct AnotherMockStream { + key: String, + a: u8, + b: u8, +} + +impl AnotherMockStream { + fn new(a: u8, b: u8) -> Self { + Self { + key: format!("{{a-{}:b-{}}}:{}", a, b, STREAM_KEY), + a, + b, + } + } +} + +impl BrokerStream for AnotherMockStream { + type Payload = MockPayload; + + fn key(&self) -> &str { + &self.key + } + + fn from_key(key: String) -> Self { + let re = r"^\{a-([^:]+):b-([^}]+)\}:test-stream$".to_string(); + let re = regex::Regex::new(&re).unwrap(); + let caps = re.captures(&key).unwrap(); + + let a = caps + .get(1) + .unwrap() + .as_str() + .to_string() + .parse::() + .unwrap(); + let b = caps + .get(2) + .unwrap() + .as_str() + .to_string() + .parse::() + .unwrap(); + + Self { key, a, b } + } +} + +/// Event generated when the Cartesi Rollups epoch finishes +#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize)] +pub struct MockRollupsClaim { + pub number: u64, +} + +#[test_log::test(tokio::test)] +async fn test_multiple_streams_it_consumes_events() { + // TODO: turn this into stream=>last_consumed_id map + let streams = vec![ + AnotherMockStream::new(1, 2), + AnotherMockStream::new(3, 4), + AnotherMockStream::new(5, 6), + ]; + + let docker = Cli::default(); + let state = TestState::setup(&docker).await; + let mut broker = state.create_broker().await; + + // Produce multiple events using the broker struct. + const N: usize = 3; + for stream in streams.iter() { + for i in 0..N { + let id = broker + .produce( + stream, + MockPayload { + data: format!("something{}", i), + }, + ) + .await + .expect("failed to produce events"); + println!("stream {} produced event with id {}", stream.key(), id); + } + } + + // Consume multiple events using the Broker struct. + let last_consumed_ids = vec![INITIAL_ID.to_string(); 3]; + + for i in 0..9 { + let (stream, event) = broker + .consume_blocking_from_multiple_streams( + streams.clone(), + last_consumed_ids.clone(), + ) + .await + .expect("failed to consume"); + println!("---> stream: {:?}", stream); + println!("---> event: {:?}", event); + } + // assert_eq!(event.id, format!("1-{}", i)); + // assert_eq!(event.payload.data, i.to_string()); + // last_id = event.id; + + assert!(false) +} + +#[test_log::test(tokio::test)] +async fn test_multiple_streams_it_blocks_until_event_is_produced() {}