Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
renan061 committed Jul 31, 2024
1 parent 93f822a commit 5072422
Show file tree
Hide file tree
Showing 11 changed files with 328 additions and 60 deletions.
1 change: 1 addition & 0 deletions offchain/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 30 additions & 13 deletions offchain/authority-claimer/src/claimer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@

use async_trait::async_trait;
use snafu::ResultExt;
use std::fmt::Debug;
use tracing::{info, trace};
use std::{collections::HashMap, fmt::Debug};
use tracing::{debug, info};

use rollups_events::Address;

use crate::{
checker::DuplicateChecker, listener::BrokerListener,
Expand All @@ -31,6 +33,9 @@ pub enum ClaimerError<
D: DuplicateChecker,
T: TransactionSender,
> {
#[snafu(display("invalid app address {:?}", app_address))]
InvalidAppAddress { app_address: Address },

#[snafu(display("broker listener error"))]
BrokerListenerError { source: B::Error },

Expand All @@ -45,16 +50,16 @@ pub enum ClaimerError<
// DefaultClaimer
// ------------------------------------------------------------------------------------------------

/// The `DefaultClaimer` must be injected with a
/// `BrokerListener`, a `DuplicateChecker` and a `TransactionSender`.
/// The `DefaultClaimer` must be injected with a `BrokerListener`, a map of `Address` to
/// `DuplicateChecker`, and a `TransactionSender`.
#[derive(Debug)]
pub struct DefaultClaimer<
B: BrokerListener,
D: DuplicateChecker,
T: TransactionSender,
> {
broker_listener: B,
duplicate_checker: D,
duplicate_checkers: HashMap<Address, D>,
transaction_sender: T,
}

Expand All @@ -63,12 +68,12 @@ impl<B: BrokerListener, D: DuplicateChecker, T: TransactionSender>
{
pub fn new(
broker_listener: B,
duplicate_checker: D,
duplicate_checkers: HashMap<Address, D>,
transaction_sender: T,
) -> Self {
Self {
broker_listener,
duplicate_checker,
duplicate_checkers,
transaction_sender,
}
}
Expand All @@ -84,25 +89,37 @@ where
type Error = ClaimerError<B, D, T>;

async fn start(mut self) -> Result<(), Self::Error> {
trace!("Starting the authority claimer loop");
debug!("Starting the authority claimer loop");
loop {
let rollups_claim = self
// Listens for claims.
let (app_address, rollups_claim) = self
.broker_listener
.listen()
.await
.context(BrokerListenerSnafu)?;
trace!("Got a claim from the broker: {:?}", rollups_claim);
debug!("Got a claim from the broker: {:?}", rollups_claim);

let is_duplicated_rollups_claim = self
.duplicate_checker
// Gets the duplicate checker.
let duplicate_checker = self
.duplicate_checkers
.get_mut(&app_address)
.ok_or(ClaimerError::InvalidAppAddress {
app_address: app_address.clone(),
})?;

// Checks for duplicates.
let is_duplicated_rollups_claim = duplicate_checker
.is_duplicated_rollups_claim(&rollups_claim)
.await
.context(DuplicatedClaimSnafu)?;

// If it is a duplicate, the loop continues.
if is_duplicated_rollups_claim {
trace!("It was a duplicated claim");
debug!("It was a duplicated claim");
continue;
}

// Sends the claim.
info!("Sending a new rollups claim");
self.transaction_sender = self
.transaction_sender
Expand Down
7 changes: 6 additions & 1 deletion offchain/authority-claimer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ pub mod metrics;
pub mod sender;
pub mod signer;

use std::collections::HashMap;

use config::Config;
use snafu::Error;
use tracing::trace;
Expand Down Expand Up @@ -53,9 +55,12 @@ pub async fn run(config: Config) -> Result<(), Box<dyn Error>> {
.await?;

// Creating the claimer loop.
let mut duplicate_checkers = HashMap::new();
duplicate_checkers
.insert(rollups_events::Address::default(), duplicate_checker); // TODO
let claimer = DefaultClaimer::new(
broker_listener,
duplicate_checker,
duplicate_checkers,
transaction_sender,
);
let claimer_handle = claimer.start();
Expand Down
63 changes: 44 additions & 19 deletions offchain/authority-claimer/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,18 @@

use async_trait::async_trait;
use rollups_events::{
Broker, BrokerConfig, BrokerError, RollupsClaim, RollupsClaimsStream,
INITIAL_ID,
Address, Broker, BrokerConfig, BrokerError, RollupsClaim,
RollupsClaimsStream, INITIAL_ID,
};
use snafu::ResultExt;
use std::fmt::Debug;
use std::{collections::HashMap, fmt::Debug};

/// The `BrokerListener` listens for new claims from the broker
/// The `BrokerListener` listens for new claims from the broker.
#[async_trait]
pub trait BrokerListener: Debug {
type Error: snafu::Error + 'static;

/// Listen to claims
async fn listen(&mut self) -> Result<RollupsClaim, Self::Error>;
async fn listen(&mut self) -> Result<(Address, RollupsClaim), Self::Error>;
}

// ------------------------------------------------------------------------------------------------
Expand All @@ -25,8 +24,9 @@ pub trait BrokerListener: Debug {
#[derive(Debug)]
pub struct DefaultBrokerListener {
broker: Broker,
stream: RollupsClaimsStream,
last_claim_id: String,

// (chain_id, address) => last_claim_id
last_claim_ids: HashMap<(u64, Address), String>,
}

#[derive(Debug, snafu::Snafu)]
Expand All @@ -38,16 +38,23 @@ pub enum BrokerListenerError {
impl DefaultBrokerListener {
pub async fn new(
broker_config: BrokerConfig,
chain_id: u64,
streams: Vec<(u64, Address)>, // Vec<(chain_id, app_address)>
) -> Result<Self, BrokerError> {
tracing::trace!("Connecting to the broker ({:?})", broker_config);
let broker = Broker::new(broker_config).await?;
let stream = RollupsClaimsStream::new(chain_id);
let last_claim_id = INITIAL_ID.to_string();

// (address, chain_id) => last_claim_id
let mut last_claim_ids: HashMap<(u64, Address), String> =
HashMap::new();
let initial_id = INITIAL_ID.to_string();
for (chain_id, app_address) in streams {
last_claim_ids
.insert((chain_id, app_address.clone()), initial_id.clone());
}

Ok(Self {
broker,
stream,
last_claim_id,
last_claim_ids,
})
}
}
Expand All @@ -56,17 +63,35 @@ impl DefaultBrokerListener {
impl BrokerListener for DefaultBrokerListener {
type Error = BrokerListenerError;

async fn listen(&mut self) -> Result<RollupsClaim, Self::Error> {
tracing::trace!("Waiting for claim with id {}", self.last_claim_id);
let event = self
async fn listen(&mut self) -> Result<(Address, RollupsClaim), Self::Error> {
// TODO
// tracing::trace!("Waiting for claim with id {}", self.last_claim_id);

let mut streams: Vec<RollupsClaimsStream> = vec![];
let mut last_ids: Vec<String> = vec![];
for ((chain_id, app_address), last_id) in self.last_claim_ids.iter() {
let chain_id = *chain_id;
let app_address = app_address.clone();
streams.push(RollupsClaimsStream::new(chain_id, app_address));
last_ids.push(last_id.clone());
}

let (key, event) = self
.broker
.consume_blocking(&self.stream, &self.last_claim_id)
.consume_blocking_from_multiple_streams(streams, last_ids)
.await
.context(BrokerSnafu)?;

self.last_claim_id = event.id;
let (chain_id, app_address): (u64, Address) =
RollupsClaimsStream::parse_key(key);

let replaced = self
.last_claim_ids
.insert((chain_id, app_address.clone()), event.id);
assert!(replaced.is_some());

Ok(event.payload)
let rollups_claim = event.payload;
Ok((app_address, rollups_claim))
}
}

Expand Down
1 change: 1 addition & 0 deletions offchain/rollups-events/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ base64.workspace = true
clap = { workspace = true, features = ["derive", "env"] }
hex.workspace = true
prometheus-client.workspace = true
regex.workspace = true
serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true
snafu.workspace = true
Expand Down
70 changes: 69 additions & 1 deletion offchain/rollups-events/src/broker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,6 @@ impl Broker {
}
}
}

/// Consume the next event in stream without blocking
/// This function returns None if there are no more remaining events.
/// To consume the first event in the stream, `last_consumed_id` should be `INITIAL_ID`.
Expand Down Expand Up @@ -280,6 +279,74 @@ impl Broker {
Ok(None)
}
}

// TODO
#[tracing::instrument(level = "trace", skip_all)]
async fn _consume_blocking_from_multiple_streams<S: BrokerStream>(
&mut self,
streams: &Vec<S>,
last_consumed_ids: &Vec<String>,
) -> Result<(S, Event<S::Payload>), BrokerError> {
let reply = retry(self.backoff.clone(), || async {
let stream_keys: Vec<String> = streams
.iter()
.map(|stream| stream.key().to_string())
.collect();

let opts = StreamReadOptions::default()
.count(1)
.block(self.consume_timeout);
let reply: StreamReadReply = self
.connection
.clone()
.xread_options(&stream_keys, &last_consumed_ids, &opts)
.await?;

Ok(reply)
})
.await
.context(ConnectionSnafu)?;

tracing::trace!("checking for timeout");
if reply.keys.is_empty() {
return Err(BrokerError::ConsumeTimeout);
}

tracing::trace!("checking if any events were received");
for mut stream_key in reply.keys {
if let Some(event) = stream_key.ids.pop() {
tracing::trace!("parsing received event");
let stream = S::from_key(stream_key.key);
let event = event.try_into()?;
return Ok((stream, event));
}
}
return Err(BrokerError::FailedToConsume);
}

// TODO
#[tracing::instrument(level = "trace", skip_all)]
pub async fn consume_blocking_from_multiple_streams<S: BrokerStream>(
&mut self,
streams: Vec<S>,
last_consumed_ids: Vec<String>,
) -> Result<(S, Event<S::Payload>), BrokerError> {
assert!(streams.len() == last_consumed_ids.len());
loop {
let result = self
._consume_blocking_from_multiple_streams(
&streams,
&last_consumed_ids,
)
.await;

if let Err(BrokerError::ConsumeTimeout) = result {
tracing::trace!("consume timed out, retrying");
} else {
return result;
}
}
}
}

/// Custom implementation of Debug because ConnectionManager doesn't implement debug
Expand All @@ -295,6 +362,7 @@ impl fmt::Debug for Broker {
pub trait BrokerStream {
type Payload: Serialize + DeserializeOwned + Clone + Eq + PartialEq;
fn key(&self) -> &str;
fn from_key(key: String) -> Self;
}

/// Event that goes through the broker
Expand Down
4 changes: 3 additions & 1 deletion offchain/rollups-events/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,6 @@ pub use rollups_outputs::{
RollupsOutput, RollupsOutputEnum, RollupsOutputValidityProof,
RollupsOutputsStream, RollupsProof, RollupsReport, RollupsVoucher,
};
pub use rollups_stream::{DAppMetadata, DAppMetadataCLIConfig};
pub use rollups_stream::{
parse_stream_with_key, DAppMetadata, DAppMetadataCLIConfig,
};
23 changes: 2 additions & 21 deletions offchain/rollups-events/src/rollups_claims.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,9 @@

use serde::{Deserialize, Serialize};

use crate::{Address, BrokerStream, Hash};
use crate::{rollups_stream::decl_broker_stream, Address, Hash};

#[derive(Debug)]
pub struct RollupsClaimsStream {
key: String,
}

impl BrokerStream for RollupsClaimsStream {
type Payload = RollupsClaim;

fn key(&self) -> &str {
&self.key
}
}

impl RollupsClaimsStream {
pub fn new(chain_id: u64) -> Self {
Self {
key: format!("{{chain-{}}}:rollups-claims", chain_id),
}
}
}
decl_broker_stream!(RollupsClaimsStream, RollupsClaim, "rollups-claim");

/// Event generated when the Cartesi Rollups epoch finishes
#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize)]
Expand Down
2 changes: 1 addition & 1 deletion offchain/rollups-events/src/rollups_outputs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{rollups_stream::decl_broker_stream, Address, Hash, Payload};

decl_broker_stream!(RollupsOutputsStream, RollupsOutput, "rollups-outputs");

/// Cartesi output
/// Cartesi output
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub enum RollupsOutput {
AdvanceResult(RollupsAdvanceResult),
Expand Down
Loading

0 comments on commit 5072422

Please sign in to comment.