Skip to content

Commit

Permalink
fixup! feat: add the MultidappClaimer as an experimental feature for …
Browse files Browse the repository at this point in the history
…sunodo
  • Loading branch information
renan061 committed Aug 16, 2024
1 parent 691d5ed commit 54c0204
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 47 deletions.
156 changes: 133 additions & 23 deletions offchain/authority-claimer/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ impl BrokerListener for DefaultBrokerListener {
pub struct MultidappBrokerListener {
broker: Broker,
streams: HashMap<RollupsClaimsStream, String>, // stream => last-claim-id
buffer: HashMap<RollupsClaimsStream, RollupsClaim>,
chain_id: u64,
}

Expand All @@ -102,9 +103,11 @@ impl MultidappBrokerListener {
);
let broker = Broker::new(broker_config).await?;
let streams = HashMap::new();
let buffer = HashMap::new();
Ok(Self {
broker,
streams,
buffer,
chain_id,
})
}
Expand All @@ -116,51 +119,84 @@ impl MultidappBrokerListener {
async fn update_streams(&mut self) -> Result<(), BrokerListenerError> {
let initial_id = INITIAL_ID.to_string();

let streams: Vec<_> =
self.broker.get_dapps().await.context(BrokerSnafu)?;
tracing::info!("Got the following dapps: {:?}", self.streams);
// Gets the dapps from the broker.
let dapps = self.broker.get_dapps().await.context(BrokerSnafu)?;
if dapps.is_empty() {
return Err(BrokerListenerError::NoApplicationsConfigured);
}
tracing::info!(
"Got the following dapps from key \"{}\": {:?}",
rollups_events::DAPPS_KEY,
dapps
);

let streams: Vec<_> = streams
// Converts dapps to streams.
let streams: Vec<_> = dapps
.into_iter()
.map(|dapp_address| {
let dapp_metadata = &DAppMetadata {
RollupsClaimsStream::new(&DAppMetadata {
chain_id: self.chain_id,
dapp_address,
};
let stream = RollupsClaimsStream::new(dapp_metadata);
let id = self.streams.get(&stream).unwrap_or(&initial_id);
(stream, id.clone())
})
})
.collect();
if streams.is_empty() {
return Err(BrokerListenerError::NoApplicationsConfigured);

// Removes obsolete dapps from the buffer, if any.
for key in self.buffer.clone().keys() {
if !streams.contains(key) {
self.buffer.remove(key);
}
}

// Adds the last consumed ids.
let streams: Vec<_> = streams
.into_iter()
.map(|stream| {
let id = self.streams.get(&stream).unwrap_or(&initial_id);
(stream, id.to_string())
})
.collect();

self.streams = HashMap::from_iter(streams);
Ok(())
}

async fn fill_buffer(&mut self) -> Result<(), BrokerListenerError> {
let streams_and_events: Vec<_> = self
.broker
.consume_blocking_from_multiple_streams(self.streams.clone())
.await
.context(BrokerSnafu)?;

for (stream, event) in streams_and_events {
// Updates the last-consumed-id from the stream.
let replaced = self.streams.insert(stream.clone(), event.id);
assert!(replaced.is_some());

let replaced = self.buffer.insert(stream, event.payload);
assert!(replaced.is_none());
}

Ok(())
}
}

#[async_trait]
impl BrokerListener for MultidappBrokerListener {
type Error = BrokerListenerError;

async fn listen(&mut self) -> Result<RollupsClaim, Self::Error> {
tracing::trace!("Waiting for claim");

self.update_streams().await?;

let (stream, event) = self
.broker
.consume_blocking_from_multiple_streams(self.streams.clone())
.await
.context(BrokerSnafu)?;

// Updates the last-consumed-id from the stream.
let replaced = self.streams.insert(stream.clone(), event.id);
assert!(replaced.is_some());
tracing::trace!("Waiting for a claim");
if self.buffer.is_empty() {
self.fill_buffer().await?;
}

Ok(event.payload)
let buffer = self.buffer.clone();
let (stream, rollups_claim) = buffer.into_iter().next().unwrap();
self.buffer.remove(&stream);
Ok(rollups_claim)
}
}

Expand Down Expand Up @@ -675,4 +711,78 @@ mod tests {

broker_listener_thread.await.unwrap();
}

#[tokio::test]
async fn multidapp_listen_buffer_order() {
let docker = Cli::default();
let (fixture, mut listener, dapps) =
setup_multidapp_listener(&docker, false).await.unwrap();

let mut epochs = vec![0; dapps.len()];
let indexes = vec![1, 1, 1, 1, 1, 2, 1, 2, 0];
multidapp_produce_claims(&fixture, &mut epochs, &dapps, &indexes).await;

let mut buffers = vec![
vec![0, 1, 2], //
vec![1, 2],
vec![1],
vec![1],
vec![1],
vec![1],
];

for buffer in buffers.iter_mut() {
for _ in 0..buffer.len() {
println!("Buffer: {:?}", buffer);
let result = listener.listen().await;
assert!(result.is_ok(), "{:?}", result.unwrap_err());
let dapp_address = result.unwrap().dapp_address;
let index = dapps
.iter()
.position(|address| *address == dapp_address)
.unwrap();
let index = buffer.iter().position(|i| *i == index).unwrap();
buffer.remove(index);
}
assert!(buffer.is_empty());
println!("Emptied one of the buffers");
}
}

#[tokio::test]
async fn multidapp_listen_buffer_change() {
let docker = Cli::default();
let (fixture, mut listener, mut dapps) =
setup_multidapp_listener(&docker, false).await.unwrap();

let mut epochs = vec![0; dapps.len()];
let indexes = vec![2, 2, 2, 0, 1, 0];
multidapp_produce_claims(&fixture, &mut epochs, &dapps, &indexes).await;

// Removes the last dapp.
assert!(dapps.pop().is_some());
fixture.set_dapps(dapps.clone()).await;

let mut buffers = vec![
vec![0, 1], //
vec![0],
];

for buffer in buffers.iter_mut() {
for _ in 0..buffer.len() {
println!("Buffer: {:?}", buffer);
let result = listener.listen().await;
assert!(result.is_ok(), "{:?}", result.unwrap_err());
let dapp_address = result.unwrap().dapp_address;
let index = dapps
.iter()
.position(|address| *address == dapp_address)
.unwrap();
let index = buffer.iter().position(|i| *i == index).unwrap();
buffer.remove(index);
}
assert!(buffer.is_empty());
println!("Emptied one of the buffers");
}
}
}
18 changes: 12 additions & 6 deletions offchain/rollups-events/src/broker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::Address;
pub mod indexer;

pub const INITIAL_ID: &str = "0";
const DAPPS_KEY: &str = "experimental-dapp-addresses-config";
pub const DAPPS_KEY: &str = "experimental-dapp-addresses-config";

/// The `BrokerConnection` enum implements the `ConnectionLike` trait
/// to satisfy the `AsyncCommands` trait bounds.
Expand Down Expand Up @@ -292,7 +292,7 @@ impl Broker {
&mut self,
streams: &Vec<S>,
last_consumed_ids: &Vec<String>,
) -> Result<(S, Event<S::Payload>), BrokerError> {
) -> Result<Vec<(S, Event<S::Payload>)>, BrokerError> {
let reply = retry(self.backoff.clone(), || async {
let stream_keys: Vec<String> = streams
.iter()
Expand All @@ -318,16 +318,22 @@ impl Broker {
return Err(BrokerError::ConsumeTimeout);
}

tracing::trace!("checking if any events were received");
tracing::trace!("getting the consumed events");
let mut response: Vec<(S, Event<S::Payload>)> = vec![];
for mut stream_key in reply.keys {
tracing::trace!("parsing stream key {:?}", stream_key);
if let Some(event) = stream_key.ids.pop() {
tracing::trace!("parsing received event");
let stream = S::from_key(stream_key.key);
let event = event.try_into()?;
return Ok((stream, event));
response.push((stream, event));
}
}
return Err(BrokerError::FailedToConsume);
if response.is_empty() {
Err(BrokerError::FailedToConsume)
} else {
Ok(response)
}
}

/// Consume the next event from one of the streams.
Expand All @@ -342,7 +348,7 @@ impl Broker {
>(
&mut self,
streams: HashMap<S, String>, // streams to last-consumed-ids
) -> Result<(S, Event<S::Payload>), BrokerError> {
) -> Result<Vec<(S, Event<S::Payload>)>, BrokerError> {
let (streams, last_consumed_ids): (Vec<_>, Vec<_>) =
streams.into_iter().map(identity).unzip();

Expand Down
2 changes: 1 addition & 1 deletion offchain/rollups-events/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ mod rollups_stream;
pub use broker::{
indexer, Broker, BrokerCLIConfig, BrokerConfig, BrokerEndpoint,
BrokerError, BrokerMultiStream, BrokerStream, Event, RedactedUrl, Url,
INITIAL_ID,
DAPPS_KEY, INITIAL_ID,
};
pub use common::{Address, Hash, Payload, ADDRESS_SIZE, HASH_SIZE};
pub use rollups_claims::{RollupsClaim, RollupsClaimsStream};
Expand Down
38 changes: 21 additions & 17 deletions offchain/rollups-events/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,26 +367,28 @@ async fn test_it_consumes_from_multiple_streams() {

// Consumes all events using the broker struct.
let mut counters = HashMap::new();
for _ in 0..(N * streams.len()) {
let (stream, event) = broker
for _ in 0..streams.len() {
let streams_and_events = broker
.consume_blocking_from_multiple_streams(streams.clone())
.await
.expect("failed to consume");

let i = counters
.entry(stream.clone())
.and_modify(|n| *n += 1)
.or_insert(0)
.clone();

// Asserts that the payload is correct.
let expected = format!("{}{}{}", stream.a, stream.b, i);
assert_eq!(expected, event.payload.data);

// Updates the map of streams with the last consumed id.
let replaced = streams.insert(stream, event.id);
// And asserts that the key from the map was indeed overwritten.
assert!(replaced.is_some());
for (stream, event) in streams_and_events {
let i = counters
.entry(stream.clone())
.and_modify(|n| *n += 1)
.or_insert(0)
.clone();

// Asserts that the payload is correct.
let expected = format!("{}{}{}", stream.a, stream.b, i);
assert_eq!(expected, event.payload.data);

// Updates the map of streams with the last consumed id.
let replaced = streams.insert(stream, event.id);
// And asserts that the key from the map was indeed overwritten.
assert!(replaced.is_some());
}
}

// Asserts that N events were consumed from each stream.
Expand Down Expand Up @@ -414,10 +416,12 @@ async fn test_it_consumes_from_multiple_streams() {

// Consumes the final event.
let marker = Instant::now();
let (final_stream, _) = broker
let mut streams_and_events = broker
.consume_blocking_from_multiple_streams(streams)
.await
.expect("failed to consume the final event");
assert_eq!(1, streams_and_events.len());
let (final_stream, _) = streams_and_events.pop().unwrap();

// Asserts that the main thread blocked for at least WAIT seconds.
assert!(marker.elapsed().as_secs() >= WAIT);
Expand Down

0 comments on commit 54c0204

Please sign in to comment.