Skip to content

Commit

Permalink
feat(offchain): implement the broker-listener for the authority-claimer
Browse files Browse the repository at this point in the history
  • Loading branch information
renan061 authored and marcelstanley committed Sep 8, 2023
1 parent bd4a270 commit 5808482
Show file tree
Hide file tree
Showing 12 changed files with 457 additions and 164 deletions.
4 changes: 3 additions & 1 deletion offchain/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion offchain/advance-runner/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl BrokerFacade {
loop {
let result = self
.client
.consume_blocking(&self.inputs_stream, last_id)
.consume_blocking_deprecated(&self.inputs_stream, last_id)
.await;
if matches!(result, Err(BrokerError::ConsumeTimeout)) {
tracing::trace!("consume timed out, retrying");
Expand Down
7 changes: 6 additions & 1 deletion offchain/authority-claimer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@ rollups-events = { path = "../rollups-events" }
async-trait.workspace = true
clap = { workspace = true, features = ["derive"] }
eth-tx-manager.workspace = true
ethers.workspace = true
rusoto_core.workspace = true
serde.workspace = true
serde_json.workspace = true
snafu.workspace = true
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
tracing.workspace = true

[dev-dependencies]
test-fixtures = { path = "../test-fixtures" }

backoff = { workspace = true, features = ["tokio"] }
testcontainers.workspace = true
13 changes: 6 additions & 7 deletions offchain/authority-claimer/src/checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ use rollups_events::RollupsClaim;
use snafu::Snafu;
use std::fmt::Debug;

/// The `DuplicateChecker` checks if a given claim was already submitted
/// to the blockchain.
/// The `DuplicateChecker` checks if a given claim was already submitted to the blockchain.
#[async_trait]
pub trait DuplicateChecker: Debug {
type Error: snafu::Error;
type Error: snafu::Error + 'static;

async fn is_duplicated_rollups_claim(
&self,
Expand All @@ -26,24 +25,24 @@ pub trait DuplicateChecker: Debug {
pub struct DefaultDuplicateChecker;

#[derive(Debug, Snafu)]
pub enum DefaultDuplicateCheckerError {
pub enum DuplicateCheckerError {
Todo,
}

impl DefaultDuplicateChecker {
pub fn new() -> Result<Self, DefaultDuplicateCheckerError> {
pub fn new() -> Result<Self, DuplicateCheckerError> {
todo!()
}
}

#[async_trait]
impl DuplicateChecker for DefaultDuplicateChecker {
type Error = DefaultDuplicateCheckerError;
type Error = DuplicateCheckerError;

async fn is_duplicated_rollups_claim(
&self,
_rollups_claim: &RollupsClaim,
) -> Result<bool, Self::Error> {
todo!()
Err(DuplicateCheckerError::Todo)
}
}
128 changes: 60 additions & 68 deletions offchain/authority-claimer/src/claimer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,88 +2,80 @@
// SPDX-License-Identifier: Apache-2.0 (see LICENSE)

use async_trait::async_trait;
use rollups_events::RollupsClaim;
use snafu::ResultExt;
use std::fmt::Debug;
use tracing::{info, trace};

use crate::{
checker::DuplicateChecker, listener::BrokerListener,
sender::TransactionSender,
};
use crate::{checker::DuplicateChecker, sender::TransactionSender};

/// The `AuthorityClaimer` starts an event loop that waits for claim messages
/// from the broker, and then sends the claims to the blockchain. It checks to
/// see if the claim is duplicated before sending.
///
/// It uses three injected traits, `BrokerListener`, `DuplicateChecker`, and
/// `TransactionSender`, to, respectivelly, listen for messages, check for
/// duplicated claims, and send claims to the blockchain.
/// The `Claimer` sends claims to the blockchain. It checks
/// whether the claim is duplicated before sending.
#[async_trait]
pub trait AuthorityClaimer {
async fn start<L, C, S>(
&self,
broker_listener: L,
duplicate_checker: C,
transaction_sender: S,
) -> Result<(), AuthorityClaimerError<L, C, S>>
where
L: BrokerListener + Send + Sync,
C: DuplicateChecker + Send + Sync,
S: TransactionSender + Send,
{
trace!("Starting the authority claimer loop");
let mut transaction_sender = transaction_sender;
loop {
let rollups_claim = broker_listener
.listen()
.await
.context(BrokerListenerSnafu)?;
trace!("Got a claim from the broker: {:?}", rollups_claim);
pub trait Claimer: Sized + Debug {
type Error: snafu::Error + 'static;

let is_duplicated_rollups_claim = duplicate_checker
.is_duplicated_rollups_claim(&rollups_claim)
.await
.context(DuplicateCheckerSnafu)?;
if is_duplicated_rollups_claim {
trace!("It was a duplicated claim");
continue;
}

info!("Sending a new rollups claim");
transaction_sender = transaction_sender
.send_rollups_claim(rollups_claim)
.await
.context(TransactionSenderSnafu)?
}
}
async fn send_rollups_claim(
self,
rollups_claim: RollupsClaim,
) -> Result<Self, Self::Error>;
}

#[derive(Debug, snafu::Snafu)]
pub enum AuthorityClaimerError<
L: BrokerListener + 'static,
C: DuplicateChecker + 'static,
S: TransactionSender + 'static,
> {
#[snafu(display("broker listener error"))]
BrokerListenerError { source: L::Error },

#[snafu(display("duplicate checker error"))]
DuplicateCheckerError { source: C::Error },
pub enum ClaimerError<D: DuplicateChecker, T: TransactionSender> {
#[snafu(display("duplicated claim error"))]
DuplicatedClaimError { source: D::Error },

#[snafu(display("transaction sender error"))]
TransactionSenderError { source: S::Error },
TransactionSenderError { source: T::Error },
}

// ------------------------------------------------------------------------------------------------
// DefaultAuthorityClaimer
// ------------------------------------------------------------------------------------------------

#[derive(Default)]
pub struct DefaultAuthorityClaimer;
/// The `AbstractClaimer` must be injected with a
/// `DuplicateChecker` and a `TransactionSender`.
#[derive(Debug)]
pub struct AbstractClaimer<D: DuplicateChecker, T: TransactionSender> {
duplicate_checker: D,
transaction_sender: T,
}

impl DefaultAuthorityClaimer {
pub fn new() -> Self {
Self
impl<D: DuplicateChecker, T: TransactionSender> AbstractClaimer<D, T> {
pub fn new(duplicate_checker: D, transaction_sender: T) -> Self {
Self {
duplicate_checker,
transaction_sender,
}
}
}

impl AuthorityClaimer for DefaultAuthorityClaimer {}
#[async_trait]
impl<D, T> Claimer for AbstractClaimer<D, T>
where
D: DuplicateChecker + Send + Sync + 'static,
T: TransactionSender + Send + 'static,
{
type Error = ClaimerError<D, T>;

async fn send_rollups_claim(
mut self,
rollups_claim: RollupsClaim,
) -> Result<Self, Self::Error> {
let is_duplicated_rollups_claim = self
.duplicate_checker
.is_duplicated_rollups_claim(&rollups_claim)
.await
.context(DuplicatedClaimSnafu)?;
if is_duplicated_rollups_claim {
trace!("It was a duplicated claim");
return Ok(self);
}

info!("Sending a new rollups claim");
self.transaction_sender = self
.transaction_sender
.send_rollups_claim_transaction(rollups_claim)
.await
.context(TransactionSenderSnafu)?;

Ok(self)
}
}
71 changes: 38 additions & 33 deletions offchain/authority-claimer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,64 +8,69 @@ pub mod listener;
pub mod metrics;
pub mod sender;

#[cfg(test)]
mod mock;

use config::Config;
use rollups_events::DAppMetadata;
use snafu::Error;
use tracing::trace;

use crate::{
checker::DefaultDuplicateChecker,
claimer::{AuthorityClaimer, DefaultAuthorityClaimer},
listener::DefaultBrokerListener,
claimer::AbstractClaimer,
listener::{BrokerListener, DefaultBrokerListener},
metrics::AuthorityClaimerMetrics,
sender::DefaultTransactionSender,
};

pub async fn run(config: Config) -> Result<(), Box<dyn Error>> {
tracing::info!(?config, "starting authority-claimer");
tracing::info!(?config, "Starting the authority-claimer");

// Creating the metrics and health server.
let metrics = AuthorityClaimerMetrics::new();
let http_server_handle =
http_server::start(config.http_server_config, metrics.clone().into());

let dapp_address = config.authority_claimer_config.dapp_address;
let dapp_metadata = DAppMetadata {
chain_id: config.authority_claimer_config.tx_manager_config.chain_id,
dapp_address,
};
let claimer_handle = {
let config = config.authority_claimer_config;

let dapp_address = config.dapp_address;
let dapp_metadata = DAppMetadata {
chain_id: config.tx_manager_config.chain_id,
dapp_address,
};

// Creating the broker listener.
trace!("Creating the broker listener");
let broker_listener = DefaultBrokerListener::new(
config.authority_claimer_config.broker_config.clone(),
dapp_metadata.clone(),
metrics.clone(),
)
.map_err(Box::new)?;
// Creating the duplicate checker.
trace!("Creating the duplicate checker");
let duplicate_checker = DefaultDuplicateChecker::new()?;

// Creating the duplicate checker.
trace!("Creating the duplicate checker");
let duplicate_checker = DefaultDuplicateChecker::new().map_err(Box::new)?;
// Creating the transaction sender.
trace!("Creating the transaction sender");
let transaction_sender = DefaultTransactionSender::new(
dapp_metadata.clone(),
metrics.clone(),
)?;

// Creating the transaction sender.
trace!("Creating the transaction sender");
let transaction_sender =
DefaultTransactionSender::new(dapp_metadata, metrics)
.map_err(Box::new)?;
// Creating the broker listener.
trace!("Creating the broker listener");
let broker_listener =
DefaultBrokerListener::new(config.broker_config, dapp_metadata)
.await?;

// Creating the claimer loop.
let authority_claimer = DefaultAuthorityClaimer::new();
let claimer_handle = authority_claimer.start(
broker_listener,
duplicate_checker,
transaction_sender,
);
// Creating the claimer.
trace!("Creating the claimer");
let claimer =
AbstractClaimer::new(duplicate_checker, transaction_sender);

// Returning the claimer event loop.
broker_listener.start(claimer)
};

// Starting the HTTP server and the claimer loop.
tokio::select! {
ret = http_server_handle => { ret.map_err(Box::new)? }
ret = claimer_handle => { ret.map_err(Box::new)? }
ret = http_server_handle => { ret? }
ret = claimer_handle => { ret? }
};

unreachable!()
Expand Down
Loading

0 comments on commit 5808482

Please sign in to comment.