diff --git a/offchain/authority-claimer/src/listener.rs b/offchain/authority-claimer/src/listener.rs index 87bca4690..d75625d70 100644 --- a/offchain/authority-claimer/src/listener.rs +++ b/offchain/authority-claimer/src/listener.rs @@ -21,9 +21,6 @@ pub trait BrokerListener: Debug { pub enum BrokerListenerError { #[snafu(display("broker error"))] BrokerError { source: BrokerError }, - - #[snafu(display("no applications configured"))] - NoApplicationsConfigured, } // ------------------------------------------------------------------------------------------------ @@ -121,9 +118,7 @@ impl MultidappBrokerListener { // Gets the dapps from the broker. let dapps = self.broker.get_dapps().await.context(BrokerSnafu)?; - if dapps.is_empty() { - return Err(BrokerListenerError::NoApplicationsConfigured); - } + assert!(!dapps.is_empty()); tracing::info!( "Got the following dapps from key \"{}\": {:?}", rollups_events::DAPPS_KEY, @@ -161,13 +156,17 @@ impl MultidappBrokerListener { Ok(()) } - async fn fill_buffer(&mut self) -> Result<(), BrokerListenerError> { - let streams_and_events: Vec<_> = self + // Returns true if it succeeded in filling the buffer and false otherwise. + async fn fill_buffer(&mut self) -> Result { + let streams_and_events = self .broker .consume_blocking_from_multiple_streams(self.streams.clone()) - .await - .context(BrokerSnafu)?; + .await; + if let Err(BrokerError::FailedToConsume) = streams_and_events { + return Ok(false); + } + let streams_and_events = streams_and_events.context(BrokerSnafu)?; for (stream, event) in streams_and_events { // Updates the last-consumed-id from the stream. let replaced = self.streams.insert(stream.clone(), event.id); @@ -177,7 +176,7 @@ impl MultidappBrokerListener { assert!(replaced.is_none()); } - Ok(()) + Ok(true) } } @@ -190,7 +189,13 @@ impl BrokerListener for MultidappBrokerListener { tracing::trace!("Waiting for a claim"); if self.buffer.is_empty() { - self.fill_buffer().await?; + loop { + if self.fill_buffer().await? { + break; + } else { + self.update_streams().await?; + } + } } let buffer = self.buffer.clone(); @@ -206,6 +211,7 @@ impl BrokerListener for MultidappBrokerListener { #[cfg(test)] mod tests { + use serial_test::serial; use std::{collections::HashMap, time::Duration}; use testcontainers::clients::Cli; @@ -217,7 +223,7 @@ mod tests { RollupsClaim, RollupsClaimsStream, Url, }; - use crate::listener::{BrokerListener, BrokerListenerError}; + use crate::listener::BrokerListener; use super::{DefaultBrokerListener, MultidappBrokerListener}; @@ -373,9 +379,9 @@ mod tests { > { let chain_id: u64 = 0; let dapp_addresses: Vec
= vec![ - [3; 20].into(), // - [5; 20].into(), // - [7; 20].into(), // + [3; 20].into(), // + [5; 20].into(), // + [10; 20].into(), // ]; let dapps: Vec<_> = dapp_addresses .clone() @@ -385,7 +391,7 @@ mod tests { let fixture = ClaimerMultidappBrokerFixture::setup(docker, dapps.clone()).await; - fixture.set_dapps(dapp_addresses.clone()).await; + fixture.dapps_set(dapp_addresses.clone()).await; let redis_endpoint = if should_fail { BrokerEndpoint::Single(RedactedUrl::new( @@ -490,16 +496,17 @@ mod tests { let docker = Cli::default(); let (fixture, mut listener, dapps) = setup_multidapp_listener(&docker, false).await.unwrap(); - fixture.set_dapps(vec![]).await; + fixture.dapps_set(vec![]).await; let mut epochs = vec![0; dapps.len()]; let indexes = vec![0, 1, 2]; multidapp_produce_claims(&fixture, &mut epochs, &dapps, &indexes).await; - let result = listener.listen().await; + + let thread = tokio::spawn(async move { + let _ = listener.listen().await; + unreachable!(); + }); + let result = tokio::time::timeout(Duration::from_secs(3), thread).await; assert!(result.is_err()); - assert_eq!( - BrokerListenerError::NoApplicationsConfigured.to_string(), - result.unwrap_err().to_string() - ); } #[tokio::test] @@ -507,13 +514,43 @@ mod tests { let docker = Cli::default(); let (fixture, mut listener, dapps) = setup_multidapp_listener(&docker, false).await.unwrap(); - fixture.set_dapps(vec![dapps.get(0).unwrap().clone()]).await; + fixture.dapps_set(vec![dapps.get(0).unwrap().clone()]).await; let mut epochs = vec![0; dapps.len()]; let indexes = vec![2, 1, 1, 2, 0]; multidapp_produce_claims(&fixture, &mut epochs, &dapps, &indexes).await; assert_listen(&mut listener, &dapps, &vec![0]).await; } + #[serial] + #[tokio::test] + async fn multidapp_listen_with_duplicate_dapps() { + let docker = Cli::default(); + let (fixture, mut listener, dapps) = + setup_multidapp_listener(&docker, false).await.unwrap(); + fixture.dapps_set(vec![]).await; + + // Initializes with 0 addresses in the set. + assert_eq!(0, fixture.dapps_members().await.len()); + + // We add a lowercase and an uppercase version of the same address. + let dapp: Address = [10; 20].into(); + fixture.dapps_add(dapp.to_string().to_lowercase()).await; + fixture.dapps_add(dapp.to_string().to_uppercase()).await; + + // We now have 2 addresses in the set. + assert_eq!(2, fixture.dapps_members().await.len()); + + // We then produce some claims and listen for them. + let mut epochs = vec![0; dapps.len()]; + let indexes = vec![2, 2, 0]; + multidapp_produce_claims(&fixture, &mut epochs, &dapps, &indexes).await; + let indexes = vec![2, 2]; + assert_listen(&mut listener, &dapps, &indexes).await; + + // Now we have 1 address because one of the duplicates got deleted. + assert_eq!(1, fixture.dapps_members().await.len()); + } + #[tokio::test] async fn multidapp_listen_with_changing_dapps() { let docker = Cli::default(); @@ -554,7 +591,7 @@ mod tests { println!("=== Epochs: {:?}", epochs); println!("--- Setting dapps..."); - fixture.set_dapps(first_batch_dapps.clone()).await; + fixture.dapps_set(first_batch_dapps.clone()).await; assert_listen(&mut listener, &dapps, &first_batch).await; let mut dapps = streams_to_vec(&listener.streams); dapps.sort(); @@ -575,7 +612,7 @@ mod tests { println!("=== Epochs: {:?}", epochs); println!("--- Setting dapps..."); - fixture.set_dapps(second_batch_dapps.clone()).await; + fixture.dapps_set(second_batch_dapps.clone()).await; assert_listen(&mut listener, &dapps, &second_batch).await; let mut dapps = streams_to_vec(&listener.streams); dapps.sort(); @@ -596,7 +633,7 @@ mod tests { println!("=== Epochs: {:?}", epochs); println!("--- Setting dapps..."); - fixture.set_dapps(third_batch_dapps.clone()).await; + fixture.dapps_set(third_batch_dapps.clone()).await; assert_listen(&mut listener, &dapps, &third_batch).await; let mut dapps = streams_to_vec(&listener.streams); dapps.sort(); @@ -617,7 +654,7 @@ mod tests { println!("=== Epochs: {:?}", epochs); println!("--- Setting dapps..."); - fixture.set_dapps(fourth_batch_dapps.clone()).await; + fixture.dapps_set(fourth_batch_dapps.clone()).await; assert_listen(&mut listener, &dapps, &fourth_batch).await; let mut dapps = streams_to_vec(&listener.streams); dapps.sort(); @@ -761,7 +798,7 @@ mod tests { // Removes the last dapp. assert!(dapps.pop().is_some()); - fixture.set_dapps(dapps.clone()).await; + fixture.dapps_set(dapps.clone()).await; let mut buffers = vec![ vec![0, 1], // diff --git a/offchain/rollups-events/src/broker/mod.rs b/offchain/rollups-events/src/broker/mod.rs index be9118997..529d1d1ee 100644 --- a/offchain/rollups-events/src/broker/mod.rs +++ b/offchain/rollups-events/src/broker/mod.rs @@ -338,8 +338,8 @@ impl Broker { /// Consume the next event from one of the streams. /// - /// This function blocks until a new event is available in one of the streams, - /// and retries whenever a timeout happens instead of returning an error. + /// This function blocks until a new event is available in one of the streams. + /// It timeouts with BrokerError::FailedToConsume. /// /// To consume the first event for a stream, `last_consumed_id[...]` should be `INITIAL_ID`. #[tracing::instrument(level = "trace", skip_all)] @@ -352,45 +352,67 @@ impl Broker { let (streams, last_consumed_ids): (Vec<_>, Vec<_>) = streams.into_iter().map(identity).unzip(); - loop { - let result = self - ._consume_blocking_from_multiple_streams( - &streams, - &last_consumed_ids, - ) - .await; + let result = self + ._consume_blocking_from_multiple_streams( + &streams, + &last_consumed_ids, + ) + .await; - if let Err(BrokerError::ConsumeTimeout) = result { - tracing::trace!("consume timed out, retrying"); - } else { - return result; - } + if let Err(BrokerError::ConsumeTimeout) = result { + Err(BrokerError::FailedToConsume) + } else { + result } } - /// Gets the dapp addresses. #[tracing::instrument(level = "trace", skip_all)] - pub async fn get_dapps(&mut self) -> Result, BrokerError> { - retry(self.backoff.clone(), || async { + pub async fn _get_dapps(&mut self) -> Result, BrokerError> { + let reply = retry(self.backoff.clone(), || async { tracing::trace!(key = DAPPS_KEY, "getting key"); let reply: Vec = self.connection.clone().smembers(DAPPS_KEY).await?; - let dapp_addresses: Vec
= reply - .iter() - .map(|s| Address::from_str(s).unwrap()) - .collect(); + + let mut dapp_addresses: Vec
= vec![]; + for value in reply { + let normalized = value.to_lowercase(); + let dapp_address = Address::from_str(&normalized).unwrap(); + if dapp_addresses.contains(&dapp_address) { + let _: () = + self.connection.clone().srem(DAPPS_KEY, value).await?; + } else { + dapp_addresses.push(dapp_address); + } + } + Ok(dapp_addresses) }) .await - .context(ConnectionSnafu) + .context(ConnectionSnafu)?; + + if reply.is_empty() { + Err(BrokerError::ConsumeTimeout) + } else { + Ok(reply) + } + } + + /// Gets the dapp addresses. + pub async fn get_dapps(&mut self) -> Result, BrokerError> { + loop { + let result = self._get_dapps().await; + if let Err(BrokerError::ConsumeTimeout) = result { + tracing::trace!("consume timed out, retrying"); + } else { + return result; + } + } } /// Sets the dapp addresses. + /// NOTE: this function is used strictly for testing. #[tracing::instrument(level = "trace", skip_all)] - pub async fn set_dapps( - &mut self, - dapp_addresses: Vec
, - ) -> Result<(), BrokerError> { + pub async fn dapps_set(&mut self, dapp_addresses: Vec
) { tracing::trace!(key = DAPPS_KEY, "setting key"); let _: () = self.connection.clone().del(DAPPS_KEY).await.unwrap(); for dapp_address in dapp_addresses { @@ -401,7 +423,26 @@ impl Broker { .await .unwrap(); } - Ok(()) + } + + /// Adds a dapp address (as a string). + /// NOTE: this function is used strictly for testing. + #[tracing::instrument(level = "trace", skip_all)] + pub async fn dapps_add(&mut self, dapp_address: String) { + tracing::trace!(dapp = dapp_address, "adding dapp"); + self.connection + .clone() + .sadd(DAPPS_KEY, dapp_address) + .await + .unwrap() + } + + /// Gets the dapp addresses as strings. + /// NOTE: this function is used strictly for testing. + #[tracing::instrument(level = "trace", skip_all)] + pub async fn dapps_members(&mut self) -> Vec { + tracing::trace!("getting dapps members"); + self.connection.clone().smembers(DAPPS_KEY).await.unwrap() } } diff --git a/offchain/test-fixtures/src/broker.rs b/offchain/test-fixtures/src/broker.rs index b2d435472..a384b757f 100644 --- a/offchain/test-fixtures/src/broker.rs +++ b/offchain/test-fixtures/src/broker.rs @@ -299,8 +299,18 @@ impl ClaimerMultidappBrokerFixture<'_> { } #[tracing::instrument(level = "trace", skip_all)] - pub async fn set_dapps(&self, dapps: Vec
) { - self.client.lock().await.set_dapps(dapps).await.unwrap() + pub async fn dapps_set(&self, dapps: Vec
) { + self.client.lock().await.dapps_set(dapps).await + } + + #[tracing::instrument(level = "trace", skip_all)] + pub async fn dapps_add(&self, dapp: String) { + self.client.lock().await.dapps_add(dapp).await + } + + #[tracing::instrument(level = "trace", skip_all)] + pub async fn dapps_members(&self) -> Vec { + self.client.lock().await.dapps_members().await } // Different from the default function,