diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index e9290c5d3..10ef589fc 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -116,83 +116,6 @@ jobs: - name: Check auto generated files run: make check-generate - - test-rust: - runs-on: ubuntu-22.04 - env: - RUSTFLAGS: -D warnings -C debuginfo=0 - defaults: - run: - working-directory: cmd/authority-claimer - steps: - - uses: actions/checkout@v4 - with: - submodules: recursive - - - uses: depot/setup-action@v1 - - name: Build dependency images - uses: depot/bake-action@v1 - with: - files: | - ./docker-bake.hcl - ./docker-bake.override.hcl - ./docker-bake.platforms.hcl - targets: | - rollups-node-snapshot - project: ${{ vars.DEPOT_PROJECT }} - workdir: build - load: true - - - uses: actions/cache@v4 - with: - path: | - ~/.cargo/bin/ - ~/.cargo/registry/index/ - ~/.cargo/registry/cache/ - ~/.cargo/git/db/ - ./cmd/authority-claimer/target/ - key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }} - restore-keys: ${{ runner.os }}-cargo- - - - name: Update Rust - run: rustup update - - - name: Install cargo sweep - run: cargo install cargo-sweep - continue-on-error: true - - - name: Install cargo cache - run: cargo install cargo-cache - continue-on-error: true - - - name: Install cargo-machete - run: cargo install cargo-machete - continue-on-error: true - - - name: Set sweep timestamp - run: cargo sweep -s - - - name: Analyze dependencies - run: cargo machete . - - - name: Check code format - run: cargo fmt --all -- --check - - - name: Run linter - run: cargo clippy -- -A clippy::module_inception -A clippy::mixed_attributes_style - - - name: Build binaries and tests - run: cargo build --all-targets - - - name: Clean old build files - run: cargo sweep -f - - - name: Clean dependencies source files - run: cargo cache --autoclean - - - name: Run tests - run: cargo test - test-go: runs-on: ubuntu-22.04 container: @@ -241,7 +164,6 @@ jobs: runs-on: ubuntu-22.04 needs: - do-basic-checks - - test-rust - test-go steps: - uses: actions/checkout@v4 diff --git a/Dockerfile b/Dockerfile index c9c8cf6bc..4aac59387 100644 --- a/Dockerfile +++ b/Dockerfile @@ -4,10 +4,8 @@ # syntax=docker.io/docker/dockerfile:1 ARG EMULATOR_VERSION=0.18.1 -ARG RUST_VERSION=1.78.0 # Build directories. -ARG RUST_BUILD_PATH=/build/cartesi/rust ARG GO_BUILD_PATH=/build/cartesi/go FROM cartesi/machine-emulator:${EMULATOR_VERSION} AS common-env @@ -15,7 +13,6 @@ FROM cartesi/machine-emulator:${EMULATOR_VERSION} AS common-env USER root # Re-declare ARGs so they can be used in the RUN block -ARG RUST_BUILD_PATH ARG GO_BUILD_PATH # Install ca-certificates and curl (setup). @@ -23,8 +20,8 @@ RUN < Result<(), Box> { - let contract_path = "consensus"; - let contract_name = "IConsensus"; - let bindings_file_name = "iconsensus.rs"; - - let source_path = path(contract_path, contract_name); - let output_path: PathBuf = - [&std::env::var("OUT_DIR").unwrap(), bindings_file_name] - .iter() - .collect(); - - let bindings = - Abigen::new(contract_name, fs::read_to_string(&source_path)?)? - .generate()?; - bindings.write_to_file(&output_path)?; - - println!("cargo:rerun-if-changed=build.rs"); - Ok(()) -} - -fn path(contract_path: &str, contract_name: &str) -> PathBuf { - Path::new(ROLLUPS_CONTRACTS_PATH) - .join(contract_path) - .join(format!("{}.sol", contract_name)) - .join(format!("{}.json", contract_name)) -} diff --git a/cmd/authority-claimer/rust-toolchain.toml b/cmd/authority-claimer/rust-toolchain.toml deleted file mode 100644 index 1de01fa45..000000000 --- a/cmd/authority-claimer/rust-toolchain.toml +++ /dev/null @@ -1,2 +0,0 @@ -[toolchain] -channel = "1.81.0" diff --git a/cmd/authority-claimer/src/checker.rs b/cmd/authority-claimer/src/checker.rs deleted file mode 100644 index 982623698..000000000 --- a/cmd/authority-claimer/src/checker.rs +++ /dev/null @@ -1,185 +0,0 @@ -// (c) Cartesi and individual authors (see AUTHORS) -// SPDX-License-Identifier: Apache-2.0 (see LICENSE) - -use crate::{ - contracts::iconsensus::IConsensus, - rollups_events::{Address, Hash, RollupsClaim}, -}; -use async_trait::async_trait; -use ethers::{ - self, - contract::ContractError, - providers::{ - Http, HttpRateLimitRetryPolicy, Middleware, Provider, RetryClient, - }, - types::{Address as EthersAddress, H160}, -}; -use snafu::{ensure, ResultExt, Snafu}; -use std::{collections::HashSet, fmt::Debug, sync::Arc}; -use tracing::trace; -use url::{ParseError, Url}; - -const MAX_RETRIES: u32 = 10; -const INITIAL_BACKOFF: u64 = 1000; - -/// The `DuplicateChecker` checks if a given claim was already submitted to the blockchain. -#[async_trait] -pub trait DuplicateChecker: Debug { - type Error: snafu::Error + 'static; - - async fn is_duplicated_rollups_claim( - &mut self, - rollups_claim: &RollupsClaim, - iconsensus: &Address, - ) -> Result; -} - -// ------------------------------------------------------------------------------------------------ -// DefaultDuplicateChecker -// ------------------------------------------------------------------------------------------------ - -#[derive(Debug, Clone, Hash, Eq, PartialEq)] -struct Claim { - application: Address, - last_block: u64, - claim_hash: Hash, -} - -#[derive(Debug)] -pub struct DefaultDuplicateChecker { - provider: Arc>>, - from: EthersAddress, - claims: HashSet, - confirmations: usize, - next_block_to_read: u64, -} - -#[derive(Debug, Snafu)] -pub enum DuplicateCheckerError { - #[snafu(display("failed to call contract"))] - ContractError { - source: ContractError>>, - }, - - #[snafu(display("failed to call provider"))] - ProviderError { - source: ethers::providers::ProviderError, - }, - - #[snafu(display("parser error"))] - ParseError { source: ParseError }, - - #[snafu(display( - "Depth of `{}` higher than latest block `{}`", - depth, - latest - ))] - DepthTooHigh { depth: u64, latest: u64 }, -} - -impl DefaultDuplicateChecker { - pub async fn new( - http_endpoint: String, - from: EthersAddress, - confirmations: usize, - genesis_block: u64, - ) -> Result { - let http = Http::new(Url::parse(&http_endpoint).context(ParseSnafu)?); - let retry_client = RetryClient::new( - http, - Box::new(HttpRateLimitRetryPolicy), - MAX_RETRIES, - INITIAL_BACKOFF, - ); - let provider = Arc::new(Provider::new(retry_client)); - let checker = Self { - provider, - from, - claims: HashSet::new(), - confirmations, - next_block_to_read: genesis_block, - }; - Ok(checker) - } -} - -#[async_trait] -impl DuplicateChecker for DefaultDuplicateChecker { - type Error = DuplicateCheckerError; - - async fn is_duplicated_rollups_claim( - &mut self, - rollups_claim: &RollupsClaim, - iconsensus: &Address, - ) -> Result { - self.update_claims(iconsensus).await?; - let claim = Claim { - application: rollups_claim.dapp_address.clone(), - last_block: rollups_claim.last_block, - claim_hash: rollups_claim.output_merkle_root_hash.clone(), - }; - Ok(self.claims.contains(&claim)) - } -} - -impl DefaultDuplicateChecker { - async fn update_claims( - &mut self, - iconsensus: &Address, - ) -> Result<(), DuplicateCheckerError> { - let depth = self.confirmations as u64; - - let latest = self - .provider - .get_block_number() - .await - .context(ProviderSnafu)? - .as_u64(); - - ensure!(depth <= latest, DepthTooHighSnafu { depth, latest }); - let latest = latest - depth; - - if latest < self.next_block_to_read { - trace!( - "nothing to read; next block is {}, but current block is {}", - self.next_block_to_read, - latest - ); - return Ok(()); - } - - let iconsensus = IConsensus::new( - H160(iconsensus.inner().to_owned()), - self.provider.clone(), - ); - - let claims = iconsensus - .claim_submission_filter() - .from_block(self.next_block_to_read) - .to_block(latest) - .topic1(self.from) - .query() - .await - .context(ContractSnafu)?; - - trace!( - "read new claims {:?} from block {} to {}", - claims, - self.next_block_to_read, - latest - ); - - for claim_submission in claims.into_iter() { - let claim = Claim { - application: Address::new(claim_submission.app_contract.into()), - last_block: claim_submission - .last_processed_block_number - .as_u64(), - claim_hash: Hash::new(claim_submission.claim), - }; - self.claims.insert(claim); - } - self.next_block_to_read = latest + 1; - Ok(()) - } -} diff --git a/cmd/authority-claimer/src/claimer.rs b/cmd/authority-claimer/src/claimer.rs deleted file mode 100644 index 373ff0bbb..000000000 --- a/cmd/authority-claimer/src/claimer.rs +++ /dev/null @@ -1,121 +0,0 @@ -// (c) Cartesi and individual authors (see AUTHORS) -// SPDX-License-Identifier: Apache-2.0 (see LICENSE) - -use crate::{ - checker::DuplicateChecker, repository::Repository, - sender::TransactionSender, -}; -use async_trait::async_trait; -use ethers::types::H256; -use snafu::ResultExt; -use std::fmt::Debug; -use tracing::{info, trace}; - -/// The `Claimer` starts an event loop that waits for claim messages -/// from the broker, and then sends the claims to the blockchain. It checks to -/// see if the claim is duplicated before sending. -/// -/// It uses three injected traits, `BrokerListener`, `DuplicateChecker`, and -/// `TransactionSender`, to, respectively, listen for messages, check for -/// duplicated claims, and send claims to the blockchain. -#[async_trait] -pub trait Claimer: Sized + Debug { - type Error: snafu::Error + 'static; - - async fn start(mut self) -> Result<(), Self::Error>; -} - -#[derive(Debug, snafu::Snafu)] -pub enum ClaimerError -{ - #[snafu(display("repository error"))] - Repository { source: R::Error }, - - #[snafu(display("duplicated claim error"))] - DuplicatedClaim { source: D::Error }, - - #[snafu(display("transaction sender error"))] - TransactionSender { source: T::Error }, -} - -// ------------------------------------------------------------------------------------------------ -// DefaultClaimer -// ------------------------------------------------------------------------------------------------ - -/// The `DefaultClaimer` must be injected with a -/// `BrokerListener`, a `DuplicateChecker` and a `TransactionSender`. -#[derive(Debug)] -pub struct DefaultClaimer< - R: Repository, - D: DuplicateChecker, - T: TransactionSender, -> { - repository: R, - duplicate_checker: D, - transaction_sender: T, -} - -impl - DefaultClaimer -{ - pub fn new( - repository: R, - duplicate_checker: D, - transaction_sender: T, - ) -> Self { - Self { - repository, - duplicate_checker, - transaction_sender, - } - } -} - -#[async_trait] -impl Claimer for DefaultClaimer -where - R: Repository + Send + Sync + 'static, - D: DuplicateChecker + Send + Sync + 'static, - T: TransactionSender + Send + 'static, -{ - type Error = ClaimerError; - - async fn start(mut self) -> Result<(), Self::Error> { - trace!("Starting the authority claimer loop"); - loop { - let (rollups_claim, iconsensus) = - self.repository.get_claim().await.context(RepositorySnafu)?; - info!("Received claim from the repository: {:?}", rollups_claim); - let tx_hash: H256; - let id = rollups_claim.id; - - let is_duplicated_rollups_claim = self - .duplicate_checker - .is_duplicated_rollups_claim(&rollups_claim, &iconsensus) - .await - .context(DuplicatedClaimSnafu)?; - if is_duplicated_rollups_claim { - info!("Duplicate claim detected: {:?}", rollups_claim); - // Updates the database so the claim leaves the queue - self.repository - .update_claim(id, H256::zero()) - .await - .context(RepositorySnafu)?; - continue; - } - - info!("Sending a new rollups claim transaction"); - (tx_hash, self.transaction_sender) = self - .transaction_sender - .send_rollups_claim_transaction(rollups_claim, iconsensus) - .await - .context(TransactionSenderSnafu)?; - - trace!("Updating claim data in repository"); - self.repository - .update_claim(id, tx_hash) - .await - .context(RepositorySnafu)?; - } - } -} diff --git a/cmd/authority-claimer/src/config.rs b/cmd/authority-claimer/src/config.rs deleted file mode 100644 index 316f3a868..000000000 --- a/cmd/authority-claimer/src/config.rs +++ /dev/null @@ -1,215 +0,0 @@ -// (c) Cartesi and individual authors (see AUTHORS) -// SPDX-License-Identifier: Apache-2.0 (see LICENSE) - -use crate::{ - log::{LogConfig, LogEnvCliConfig}, - redacted::Redacted, - rollups_events::HexArrayError, -}; -use clap::{command, Parser}; -use eth_tx_manager::{ - config::{ - Error as TxManagerConfigError, TxEnvCLIConfig as TxManagerCLIConfig, - TxManagerConfig, - }, - Priority, -}; -use rusoto_core::{region::ParseRegionError, Region}; -use snafu::{ResultExt, Snafu}; -use std::{fs, str::FromStr}; - -#[derive(Debug, Snafu)] -#[snafu(visibility(pub(crate)))] -pub enum AuthorityClaimerConfigError { - #[snafu(display("TxManager configuration error"))] - TxManager { source: TxManagerConfigError }, - - #[snafu(display("parse IConsensus address error"))] - ParseIConsensusAddress { source: HexArrayError }, - - #[snafu(display("Missing auth configuration"))] - AuthConfigMissing, - - #[snafu(display("Could not read mnemonic file at path `{}`", path,))] - MnemonicFileError { - path: String, - source: std::io::Error, - }, - - #[snafu(display("Missing AWS region"))] - MissingRegion, - - #[snafu(display("Invalid AWS region"))] - InvalidRegion { source: ParseRegionError }, -} - -#[derive(Debug, Clone)] -pub struct Config { - pub tx_manager_config: TxManagerConfig, - pub tx_signing_config: TxSigningConfig, - pub tx_manager_priority: Priority, - pub log_config: LogConfig, - pub postgres_endpoint: String, - pub polling_interval: u64, - pub genesis_block: u64, - pub http_server_port: u16, -} - -#[derive(Debug, Clone)] -pub enum TxSigningConfig { - PrivateKey { - private_key: Redacted, - }, - - Mnemonic { - mnemonic: Redacted, - account_index: Option, - }, - - Aws { - key_id: String, - region: Region, - }, -} - -impl Config { - pub fn new() -> Result { - let cli_config = AuthorityClaimerCLI::parse(); - - let tx_manager_config = - TxManagerConfig::initialize(cli_config.tx_manager_config) - .context(TxManagerSnafu)?; - - let tx_signing_config = - TxSigningConfig::try_from(cli_config.tx_signing_config)?; - - let log_config = LogConfig::initialize(cli_config.log_config); - - let postgres_endpoint = cli_config.postgres_endpoint; - - let polling_interval = cli_config.polling_interval; - - Ok(Config { - tx_manager_config, - tx_signing_config, - tx_manager_priority: Priority::Normal, - log_config, - postgres_endpoint, - polling_interval, - genesis_block: cli_config.genesis_block, - http_server_port: cli_config.http_server_port, - }) - } -} - -#[derive(Parser)] -#[command(name = "authority_claimer_config")] -#[command(about = "Configuration for authority-claimer")] -struct AuthorityClaimerCLI { - #[command(flatten)] - pub tx_manager_config: TxManagerCLIConfig, - - #[command(flatten)] - pub tx_signing_config: TxSigningCLIConfig, - - #[command(flatten)] - pub log_config: LogEnvCliConfig, - - /// Postgres endpoint address - #[arg(long, env)] - pub postgres_endpoint: String, - - /// Dtatabase polling interval - #[arg(long, env, default_value_t = 7)] - pub polling_interval: u64, - - /// Genesis block for reading blockchain events - #[arg(long, env, default_value_t = 1)] - pub genesis_block: u64, - - /// Port of the authority-claimer HTTP server - #[arg(long, env, default_value_t = 8080)] - pub http_server_port: u16, -} - -#[derive(Debug, Parser)] -#[command(name = "tx_signing_config")] -struct TxSigningCLIConfig { - /// Signer private key, overrides `tx_signing_private_key_file`, `tx_signing_mnemonic` , `tx_signing_mnemonic_file` and `tx_signing_aws_kms_*` - #[arg(long, env)] - tx_signing_private_key: Option, - - /// Signer private key file, overrides `tx_signing_mnemonic` , `tx_signing_mnemonic_file` and `tx_signing_aws_kms_*` - #[arg(long, env)] - tx_signing_private_key_file: Option, - - /// Signer mnemonic, overrides `tx_signing_mnemonic_file` and `tx_signing_aws_kms_*` - #[arg(long, env)] - tx_signing_mnemonic: Option, - - /// Signer mnemonic file path, overrides `tx_signing_aws_kms_*` - #[arg(long, env)] - tx_signing_mnemonic_file: Option, - - /// Mnemonic account index - #[arg(long, env)] - tx_signing_mnemonic_account_index: Option, - - /// AWS KMS signer key-id - #[arg(long, env)] - tx_signing_aws_kms_key_id: Option, - - /// AWS KMS signer region - #[arg(long, env)] - tx_signing_aws_kms_region: Option, -} - -impl TryFrom for TxSigningConfig { - type Error = AuthorityClaimerConfigError; - - fn try_from(cli: TxSigningCLIConfig) -> Result { - let account_index = cli.tx_signing_mnemonic_account_index; - if let Some(private_key) = cli.tx_signing_private_key { - Ok(TxSigningConfig::PrivateKey { - private_key: Redacted::new(private_key), - }) - } else if let Some(path) = cli.tx_signing_private_key_file { - let private_key = fs::read_to_string(path.clone()) - .context(MnemonicFileSnafu { path })? - .trim() - .to_string(); - Ok(TxSigningConfig::PrivateKey { - private_key: Redacted::new(private_key), - }) - } else if let Some(mnemonic) = cli.tx_signing_mnemonic { - Ok(TxSigningConfig::Mnemonic { - mnemonic: Redacted::new(mnemonic), - account_index, - }) - } else if let Some(path) = cli.tx_signing_mnemonic_file { - let mnemonic = fs::read_to_string(path.clone()) - .context(MnemonicFileSnafu { path })? - .trim() - .to_string(); - Ok(TxSigningConfig::Mnemonic { - mnemonic: Redacted::new(mnemonic), - account_index, - }) - } else { - match (cli.tx_signing_aws_kms_key_id, cli.tx_signing_aws_kms_region) - { - (None, _) => { - Err(AuthorityClaimerConfigError::AuthConfigMissing) - } - (Some(_), None) => { - Err(AuthorityClaimerConfigError::MissingRegion) - } - (Some(key_id), Some(region)) => { - let region = Region::from_str(®ion) - .context(InvalidRegionSnafu)?; - Ok(TxSigningConfig::Aws { key_id, region }) - } - } - } - } -} diff --git a/cmd/authority-claimer/src/contracts.rs b/cmd/authority-claimer/src/contracts.rs deleted file mode 100644 index 9869dbcb9..000000000 --- a/cmd/authority-claimer/src/contracts.rs +++ /dev/null @@ -1,18 +0,0 @@ -// (c) Cartesi and individual authors (see AUTHORS) -// SPDX-License-Identifier: Apache-2.0 (see LICENSE) - -/// Declares `$contract_name` as a module and includes everything from the `$contract_name` ABI. -macro_rules! contract { - ($contract_name: ident) => { - pub mod $contract_name { - include!(concat!( - env!("OUT_DIR"), - "/", - stringify!($contract_name), - ".rs" - )); - } - }; -} - -contract!(iconsensus); diff --git a/cmd/authority-claimer/src/http_server.rs b/cmd/authority-claimer/src/http_server.rs deleted file mode 100644 index 727ee948e..000000000 --- a/cmd/authority-claimer/src/http_server.rs +++ /dev/null @@ -1,46 +0,0 @@ -// (c) Cartesi and individual authors (see AUTHORS) -// SPDX-License-Identifier: Apache-2.0 (see LICENSE) - -// Re-exporting prometheus' Registry. -pub use prometheus_client::registry::Registry; - -// Re-exporting prometheus metrics. -// Add any other metrics to re-export here. -pub use prometheus_client::metrics::counter::Counter as CounterRef; -pub use prometheus_client::metrics::family::Family as FamilyRef; -// End of metrics to re-export. - -use axum::{routing::get, Router}; -use prometheus_client::encoding::text::encode; -use std::{ - net::SocketAddr, - sync::{Arc, Mutex}, -}; - -/// Starts a HTTP server with two endpoints: /healthz and /metrics. -/// -/// The `Registry` parameter is a `prometheus` type used for metric tracking. -pub async fn start( - port: u16, - registry: Registry, -) -> Result<(), std::io::Error> { - let ip = "0.0.0.0".parse().expect("could not parse host address"); - let addr = SocketAddr::new(ip, port); - tracing::info!("Starting HTTP server at {}", addr); - - let registry = Arc::new(Mutex::new(registry)); - let router = Router::new() - .route("/healthz", get(|| async { "" })) - .route("/metrics", get(|| get_metrics(registry))); - - let listener = tokio::net::TcpListener::bind(&addr).await?; - axum::serve(listener, router).await -} - -/// Returns the metrics as a specially encoded string. -async fn get_metrics(registry: Arc>) -> String { - let registry = registry.lock().unwrap(); - let mut buffer = String::new(); - encode(&mut buffer, ®istry).unwrap(); - buffer -} diff --git a/cmd/authority-claimer/src/lib.rs b/cmd/authority-claimer/src/lib.rs deleted file mode 100644 index 088742ac3..000000000 --- a/cmd/authority-claimer/src/lib.rs +++ /dev/null @@ -1,82 +0,0 @@ -// (c) Cartesi and individual authors (see AUTHORS) -// SPDX-License-Identifier: Apache-2.0 (see LICENSE) - -mod checker; -mod claimer; -mod config; -mod contracts; -mod http_server; -pub mod log; -mod metrics; -mod redacted; -mod repository; -mod rollups_events; -mod sender; -mod signer; - -use checker::DefaultDuplicateChecker; -use claimer::{Claimer, DefaultClaimer}; -pub use config::Config; -use ethers::signers::Signer; -use metrics::AuthorityClaimerMetrics; -use repository::DefaultRepository; -use sender::DefaultTransactionSender; -use signer::ConditionalSigner; -use snafu::Error; -use tracing::trace; - -pub async fn run(config: Config) -> Result<(), Box> { - // Creating the metrics and health server. - let metrics = AuthorityClaimerMetrics::new(); - let http_server_handle = - http_server::start(config.http_server_port, metrics.clone().into()); - - let chain_id = config.tx_manager_config.chain_id; - - // Creating repository. - trace!("Creating the repository"); - let repository = DefaultRepository::new( - config.postgres_endpoint, - config.polling_interval, - )?; - - // Creating the conditional signer. - let conditional_signer = - ConditionalSigner::new(chain_id, &config.tx_signing_config).await?; - let from = conditional_signer.address(); - - // Creating the duplicate checker. - trace!("Creating the duplicate checker"); - let duplicate_checker = DefaultDuplicateChecker::new( - config.tx_manager_config.provider_http_endpoint.clone(), - from, - config.tx_manager_config.default_confirmations, - config.genesis_block, - ) - .await?; - - // Creating the transaction sender. - trace!("Creating the transaction sender"); - let transaction_sender = DefaultTransactionSender::new( - config.tx_manager_config, - config.tx_manager_priority, - conditional_signer, - from, - chain_id, - metrics, - ) - .await?; - - // Creating the claimer loop. - let claimer = - DefaultClaimer::new(repository, duplicate_checker, transaction_sender); - let claimer_handle = claimer.start(); - - // Starting the HTTP server and the claimer loop. - tokio::select! { - ret = http_server_handle => { ret? } - ret = claimer_handle => { ret? } - }; - - unreachable!() -} diff --git a/cmd/authority-claimer/src/log.rs b/cmd/authority-claimer/src/log.rs deleted file mode 100644 index 5f2d6be72..000000000 --- a/cmd/authority-claimer/src/log.rs +++ /dev/null @@ -1,64 +0,0 @@ -// (c) Cartesi and individual authors (see AUTHORS) -// SPDX-License-Identifier: Apache-2.0 (see LICENSE) - -use clap::Parser; -use std::fmt::Debug; -use tracing::info; -use tracing_subscriber::filter::{EnvFilter, LevelFilter}; - -#[derive(Debug, Parser)] -#[command(name = "log_config")] -pub struct LogEnvCliConfig { - #[arg(long, env, default_value = "false")] - pub log_enable_timestamp: bool, - - #[arg(long, env, default_value = "false")] - pub log_enable_color: bool, -} - -#[derive(Clone, Debug, Default)] -pub struct LogConfig { - pub enable_timestamp: bool, - pub enable_color: bool, -} - -impl LogConfig { - pub fn initialize(env_cli_config: LogEnvCliConfig) -> Self { - let enable_timestamp = env_cli_config.log_enable_timestamp; - - let enable_color = env_cli_config.log_enable_color; - - LogConfig { - enable_timestamp, - enable_color, - } - } -} - -impl From for LogConfig { - fn from(cli_config: LogEnvCliConfig) -> LogConfig { - LogConfig::initialize(cli_config) - } -} - -pub fn configure(config: &LogConfig) { - let filter = EnvFilter::builder() - .with_default_directive(LevelFilter::INFO.into()) - .from_env_lossy(); - - let subscribe_builder = tracing_subscriber::fmt() - .compact() - .with_env_filter(filter) - .with_ansi(config.enable_color); - - if !config.enable_timestamp { - subscribe_builder.without_time().init(); - } else { - subscribe_builder.init(); - } -} - -pub fn log_service_start(config: &C, service: &str) { - let message = format!("Starting {} with config {:?}", service, config); - info!(message); -} diff --git a/cmd/authority-claimer/src/main.rs b/cmd/authority-claimer/src/main.rs deleted file mode 100644 index c628a776c..000000000 --- a/cmd/authority-claimer/src/main.rs +++ /dev/null @@ -1,19 +0,0 @@ -// (c) Cartesi and individual authors (see AUTHORS) -// SPDX-License-Identifier: Apache-2.0 (see LICENSE) - -use authority_claimer::{log, Config}; -use std::error::Error; - -#[tokio::main] -async fn main() -> Result<(), Box> { - // Getting the configuration. - let config: Config = Config::new().map_err(Box::new)?; - - // Setting up the logging environment. - log::configure(&config.log_config); - - //Log Service info - log::log_service_start(&config, "Authority Claimer"); - - authority_claimer::run(config).await -} diff --git a/cmd/authority-claimer/src/metrics.rs b/cmd/authority-claimer/src/metrics.rs deleted file mode 100644 index 1167b894f..000000000 --- a/cmd/authority-claimer/src/metrics.rs +++ /dev/null @@ -1,36 +0,0 @@ -// (c) Cartesi and individual authors (see AUTHORS) -// SPDX-License-Identifier: Apache-2.0 (see LICENSE) - -use crate::{ - http_server::{CounterRef, FamilyRef, Registry}, - rollups_events::DAppMetadata, -}; - -const METRICS_PREFIX: &str = "cartesi_rollups_authority_claimer"; - -fn prefixed_metrics(name: &str) -> String { - format!("{}_{}", METRICS_PREFIX, name) -} - -#[derive(Debug, Clone, Default)] -pub struct AuthorityClaimerMetrics { - pub claims_sent: FamilyRef, -} - -impl AuthorityClaimerMetrics { - pub fn new() -> Self { - Self::default() - } -} - -impl From for Registry { - fn from(metrics: AuthorityClaimerMetrics) -> Self { - let mut registry = Registry::default(); - registry.register( - prefixed_metrics("claims_sent"), - "Counts the number of claims sent", - metrics.claims_sent, - ); - registry - } -} diff --git a/cmd/authority-claimer/src/redacted.rs b/cmd/authority-claimer/src/redacted.rs deleted file mode 100644 index 476ac619b..000000000 --- a/cmd/authority-claimer/src/redacted.rs +++ /dev/null @@ -1,118 +0,0 @@ -// (c) Cartesi and individual authors (see AUTHORS) -// SPDX-License-Identifier: Apache-2.0 (see LICENSE) - -use std::fmt; -pub use url::{self, Url}; - -/// Wrapper that redacts the entire field -#[derive(Clone)] -pub struct Redacted(T); - -impl Redacted { - pub fn new(data: T) -> Redacted { - Self(data) - } - - pub fn inner(&self) -> &T { - &self.0 - } - - pub fn into_inner(self) -> T { - self.0 - } -} - -impl fmt::Debug for Redacted { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "[REDACTED]") - } -} - -#[test] -fn redacts_debug_fmt() { - let password = Redacted::new("super-security"); - assert_eq!(format!("{:?}", password), "[REDACTED]"); -} - -/// Wrapper that redacts the credentials in an URL -#[derive(Clone)] -pub struct RedactedUrl(Url); - -#[allow(dead_code)] -impl RedactedUrl { - pub fn new(url: Url) -> Self { - Self(url) - } - - pub fn inner(&self) -> &Url { - &self.0 - } - - pub fn into_inner(self) -> Url { - self.0 - } -} - -impl fmt::Debug for RedactedUrl { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let mut url = self.inner().clone(); - let result = { - if url.cannot_be_a_base() { - Err(()) - } else { - Ok(()) - } - } - .and_then(|_| { - if !url.username().is_empty() { - url.set_username("***") - } else { - Ok(()) - } - }) - .and_then(|_| { - if url.password().is_some() { - url.set_password(Some("***")) - } else { - Ok(()) - } - }); - match result { - Ok(_) => write!(f, "{}", url.as_str()), - Err(_) => write!(f, "[NON-BASE URL REDACTED]"), - } - } -} - -#[test] -fn redacts_valid_url_without_credentials() { - let url = RedactedUrl::new(Url::parse("http://example.com/").unwrap()); - assert_eq!(format!("{:?}", url), "http://example.com/"); -} - -#[test] -fn redacts_valid_url_with_username() { - let url = - RedactedUrl::new(Url::parse("http://james@example.com/").unwrap()); - assert_eq!(format!("{:?}", url), "http://***@example.com/"); -} - -#[test] -fn redacts_valid_url_with_password() { - let url = - RedactedUrl::new(Url::parse("http://:bond@example.com/").unwrap()); - assert_eq!(format!("{:?}", url), "http://:***@example.com/"); -} - -#[test] -fn redacts_valid_url_with_full_credentials() { - let url = - RedactedUrl::new(Url::parse("http://james:bond@example.com/").unwrap()); - assert_eq!(format!("{:?}", url), "http://***:***@example.com/"); -} - -#[test] -fn redacts_non_base_url() { - let url = RedactedUrl::new(Url::parse("james:bond@example.com").unwrap()); - assert_eq!(format!("{:?}", url), "[NON-BASE URL REDACTED]"); -} diff --git a/cmd/authority-claimer/src/repository.rs b/cmd/authority-claimer/src/repository.rs deleted file mode 100644 index 7f907272d..000000000 --- a/cmd/authority-claimer/src/repository.rs +++ /dev/null @@ -1,215 +0,0 @@ -// (c) Cartesi and individual authors (see AUTHORS) -// SPDX-License-Identifier: Apache-2.0 (see LICENSE) - -use async_trait::async_trait; -use ethers::types::H256; -use sea_query::{Alias, Expr, Func, Iden, Order, PostgresQueryBuilder, Query}; -use sea_query_binder::SqlxBinder; -use snafu::{ResultExt, Snafu}; -use sqlx::{ - pool::PoolConnection, postgres::PgPoolOptions, types::Decimal, Pool, - Postgres, -}; -use std::sync::Arc; -use std::{fmt::Debug, time::Duration}; -use tokio::time::sleep; - -use crate::rollups_events::{Address, Hash, RollupsClaim}; - -const REPOSITORY_MIN_CONNECTIONS: u32 = 2; -const REPOSITORY_MAX_CONNECTIONS: u32 = 10; -const REPOSITORY_ACQUIRE_TIMEOUT: Duration = Duration::new(15, 0); - -/// The `Repository` queries the database and gets an unsubmitted claim -#[async_trait] -pub trait Repository: Debug { - type Error: snafu::Error + 'static; - - async fn get_claim( - &mut self, - ) -> Result<(RollupsClaim, Address), Self::Error>; - - async fn update_claim( - &mut self, - id: u64, - tx_hash: H256, - ) -> Result<(), Self::Error>; -} - -#[derive(Debug, Snafu)] -#[snafu(visibility(pub(super)))] -pub enum RepositoryError { - #[snafu(display("database error"))] - DatabaseSqlx { source: sqlx::Error }, - - #[snafu(display("int conversion error"))] - IntConversion { source: std::num::TryFromIntError }, -} - -#[derive(Clone, Debug)] -pub struct DefaultRepository { - // Connection is not thread-safe, we use a connection pool - db_pool: Arc>, - // Polling interval in seconds - polling_interval: u64, -} - -impl DefaultRepository { - /// Create database connection pool, wait until database server is available - pub fn new( - endpoint: String, - polling_interval: u64, - ) -> Result { - let connection = PgPoolOptions::new() - .acquire_timeout(REPOSITORY_ACQUIRE_TIMEOUT) - .min_connections(REPOSITORY_MIN_CONNECTIONS) - .max_connections(REPOSITORY_MAX_CONNECTIONS) - .connect_lazy(&endpoint) - .context(DatabaseSqlxSnafu)?; - Ok(Self { - db_pool: Arc::new(connection), - polling_interval, - }) - } - - /// Obtain a connection from the connection pool - async fn conn(&self) -> PoolConnection { - self.db_pool - .acquire() - .await - .expect("No connections available in the pool") - } -} - -#[async_trait] -impl Repository for DefaultRepository { - type Error = RepositoryError; - - async fn get_claim( - &mut self, - ) -> Result<(RollupsClaim, Address), Self::Error> { - let claim: RollupsClaim; - let iconsensus_address: Address; - let mut conn = self.conn().await; - let (sql, values) = Query::select() - .columns([ - (Epoch::Table, Epoch::Id), - (Epoch::Table, Epoch::ClaimHash), - (Epoch::Table, Epoch::ApplicationAddress), - (Epoch::Table, Epoch::LastBlock), - ]) - .column((Application::Table, Application::IconsensusAddress)) - .from(Epoch::Table) - .inner_join( - Application::Table, - Expr::col((Epoch::Table, Epoch::ApplicationAddress)) - .equals((Application::Table, Application::ContractAddress)), - ) - .and_where(Expr::col((Epoch::Table, Epoch::Status)).eq( - Func::cast_as("CLAIM_COMPUTED", Alias::new("\"EpochStatus\"")), - )) - .order_by(Epoch::Index, Order::Asc) - .order_by((Epoch::Table, Epoch::Id), Order::Asc) - .limit(1) - .build_sqlx(PostgresQueryBuilder); - - loop { - let result = sqlx::query_as_with::<_, QueryResponse, _>( - &sql, - values.clone(), - ) - .fetch_optional(&mut *conn) - .await - .context(DatabaseSqlxSnafu)?; - - match result { - Some(row) => { - claim = RollupsClaim { - id: row.id.try_into().context(IntConversionSnafu)?, - last_block: row.last_block.try_into().unwrap(), - dapp_address: Address::new( - row.application_address.try_into().unwrap(), - ), - output_merkle_root_hash: Hash::new( - row.claim_hash.try_into().unwrap(), - ), - }; - iconsensus_address = Address::new( - row.iconsensus_address.try_into().unwrap(), - ); - break; - } - None => { - sleep(Duration::from_secs(self.polling_interval)).await; - continue; - } - } - } - - let _ = conn.close().await.context(DatabaseSqlxSnafu)?; - Ok((claim, iconsensus_address)) - } - - async fn update_claim( - &mut self, - id: u64, - tx_hash: H256, - ) -> Result<(), Self::Error> { - let mut conn = self.conn().await; - let (sql, values) = Query::update() - .table(Epoch::Table) - .values([ - ( - Epoch::Status, - Func::cast_as( - "CLAIM_SUBMITTED", - Alias::new("\"EpochStatus\""), - ) - .into(), - ), - ( - Epoch::TransactionHash, - tx_hash.as_fixed_bytes().to_vec().into(), - ), - ]) - .and_where(Expr::col(Epoch::Id).eq(id)) - .build_sqlx(PostgresQueryBuilder); - - let _result = sqlx::query_with(&sql, values) - .execute(&mut *conn) - .await - .context(DatabaseSqlxSnafu)?; - - let _ = conn.close().await.context(DatabaseSqlxSnafu)?; - Ok(()) - } -} - -#[derive(Iden)] -enum Epoch { - Table, - Id, - Index, - Status, - LastBlock, - ClaimHash, - TransactionHash, - ApplicationAddress, -} - -#[derive(Iden)] -enum Application { - Table, - ContractAddress, - IconsensusAddress, -} - -#[derive(sqlx::FromRow, Debug, Clone)] -#[allow(dead_code)] -struct QueryResponse { - id: i64, - last_block: Decimal, - claim_hash: Vec, - application_address: Vec, - iconsensus_address: Vec, -} diff --git a/cmd/authority-claimer/src/rollups_events/common.rs b/cmd/authority-claimer/src/rollups_events/common.rs deleted file mode 100644 index 47b2cda9e..000000000 --- a/cmd/authority-claimer/src/rollups_events/common.rs +++ /dev/null @@ -1,234 +0,0 @@ -// (c) Cartesi and individual authors (see AUTHORS) -// SPDX-License-Identifier: Apache-2.0 (see LICENSE) - -use base64::{engine::general_purpose::STANDARD as base64_engine, Engine as _}; -use prometheus_client::encoding::{EncodeLabelValue, LabelValueEncoder}; -use serde::{Deserialize, Deserializer, Serialize, Serializer}; -use snafu::{ResultExt, Snafu}; -use std::fmt::Write; - -pub const ADDRESS_SIZE: usize = 20; -pub const HASH_SIZE: usize = 32; - -const PAYLOAD_DEBUG_MAX_LEN: usize = 100; - -/// A binary array that is converted to a hex string when serialized -#[derive(Clone, Hash, Eq, PartialEq)] -pub struct HexArray([u8; N]); - -impl HexArray { - pub const fn new(data: [u8; N]) -> Self { - Self(data) - } - - pub fn inner(&self) -> &[u8; N] { - &self.0 - } - - pub fn mut_inner(&mut self) -> &mut [u8; N] { - &mut self.0 - } - - pub fn into_inner(self) -> [u8; N] { - self.0 - } -} - -impl From<[u8; N]> for HexArray { - fn from(data: [u8; N]) -> Self { - Self::new(data) - } -} - -#[derive(Debug, Snafu)] -pub enum HexArrayError { - #[snafu(display("hex decode error"))] - HexDecode { source: hex::FromHexError }, - - #[snafu(display("incorrect array size"))] - ArraySize, -} - -impl TryFrom for HexArray { - type Error = HexArrayError; - - fn try_from(mut string_data: String) -> Result { - // The hex crate doesn't decode '0x' at the start, so we treat the value before decoding - if string_data[..2].eq("0x") { - string_data.drain(..2); - } - let vec_data = hex::decode(string_data).context(HexDecodeSnafu)?; - let data = vec_data.try_into().or(Err(HexArrayError::ArraySize))?; - Ok(Self::new(data)) - } -} - -impl Serialize for HexArray { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - String::serialize(&hex::encode(self.inner()), serializer) - } -} - -impl<'de, const N: usize> Deserialize<'de> for HexArray { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - String::deserialize(deserializer)?.try_into().map_err(|e| { - serde::de::Error::custom(format!("fail to decode hex ({})", e)) - }) - } -} - -impl std::fmt::Debug for HexArray { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", hex::encode(self.inner())) - } -} - -impl Default for HexArray { - fn default() -> Self { - Self::new([0; N]) - } -} - -impl EncodeLabelValue for HexArray { - fn encode( - &self, - encoder: &mut LabelValueEncoder<'_>, - ) -> Result<(), std::fmt::Error> { - write!(encoder, "{}", hex::encode(self.inner())) - } -} - -/// Blockchain hash -pub type Hash = HexArray; - -/// Blockchain address -pub type Address = HexArray; - -/// Rollups payload. -/// When serialized, it is converted to a base64 string -#[derive(Default, Clone, Eq, PartialEq)] -pub struct Payload(Vec); - -impl Payload { - pub const fn new(data: Vec) -> Self { - Self(data) - } - - pub fn inner(&self) -> &Vec { - &self.0 - } -} - -impl From> for Payload { - fn from(data: Vec) -> Self { - Self::new(data) - } -} - -impl Serialize for Payload { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - String::serialize(&base64_engine.encode(self.inner()), serializer) - } -} - -impl<'de> Deserialize<'de> for Payload { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - let string_data = String::deserialize(deserializer)?; - let data = base64_engine.decode(string_data).map_err(|e| { - serde::de::Error::custom(format!("fail to decode base64 ({})", e)) - })?; - Ok(Payload::new(data)) - } -} - -impl std::fmt::Debug for Payload { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let len = self.inner().len(); - if len > PAYLOAD_DEBUG_MAX_LEN { - let slice = &self.inner().as_slice()[0..PAYLOAD_DEBUG_MAX_LEN]; - write!( - f, - "{}...[total: {} bytes]", - base64_engine.encode(slice), - len - ) - } else { - write!(f, "{}", base64_engine.encode(self.inner())) - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn serialize_array() { - assert_eq!( - serde_json::to_string(&Hash::new([0xfa; HASH_SIZE])).unwrap(), - r#""fafafafafafafafafafafafafafafafafafafafafafafafafafafafafafafafa""# - ); - } - - #[test] - fn deserialize_array() { - assert_eq!( - serde_json::from_str::( - r#""fafafafafafafafafafafafafafafafafafafafafafafafafafafafafafafafa""#).unwrap(), - Hash::new([0xfa; HASH_SIZE]) - ); - } - - #[test] - fn fail_to_deserialized_invalid_array() { - assert!(serde_json::from_str::("\"....\"") - .unwrap_err() - .to_string() - .contains("fail to decode hex")); - } - - #[test] - fn fail_to_deserialized_array_with_wrong_size() { - assert!(serde_json::from_str::("\"ff\"") - .unwrap_err() - .to_string() - .contains("incorrect array size")); - } - - #[test] - fn serialize_payload() { - assert_eq!( - serde_json::to_string(&Payload::new(vec![0xfa; 20])).unwrap(), - "\"+vr6+vr6+vr6+vr6+vr6+vr6+vo=\"" - ); - } - - #[test] - fn deserialize_payload() { - assert_eq!( - serde_json::from_str::("\"+vr6+vr6+vr6+vr6+vr6+vr6+vo=\"") - .unwrap(), - Payload::new(vec![0xfa; 20]) - ); - } - - #[test] - fn fail_to_deserialized_invalid_payload() { - assert!(serde_json::from_str::("\".\"") - .unwrap_err() - .to_string() - .contains("fail to decode base64")); - } -} diff --git a/cmd/authority-claimer/src/rollups_events/mod.rs b/cmd/authority-claimer/src/rollups_events/mod.rs deleted file mode 100644 index 60ad85f27..000000000 --- a/cmd/authority-claimer/src/rollups_events/mod.rs +++ /dev/null @@ -1,10 +0,0 @@ -// (c) Cartesi and individual authors (see AUTHORS) -// SPDX-License-Identifier: Apache-2.0 (see LICENSE) - -pub mod common; -pub mod rollups_claims; -pub mod rollups_stream; - -pub use common::{Address, Hash, HexArrayError}; -pub use rollups_claims::RollupsClaim; -pub use rollups_stream::DAppMetadata; diff --git a/cmd/authority-claimer/src/rollups_events/rollups_claims.rs b/cmd/authority-claimer/src/rollups_events/rollups_claims.rs deleted file mode 100644 index e2de524c1..000000000 --- a/cmd/authority-claimer/src/rollups_events/rollups_claims.rs +++ /dev/null @@ -1,21 +0,0 @@ -// (c) Cartesi and individual authors (see AUTHORS) -// SPDX-License-Identifier: Apache-2.0 (see LICENSE) - -use super::{Address, Hash}; -use serde::{Deserialize, Serialize}; - -/// Event generated when the Cartesi Rollups epoch finishes -#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize)] -pub struct RollupsClaim { - // Claim id - pub id: u64, - - // Last block processed in the claim - pub last_block: u64, - - // DApp address - pub dapp_address: Address, - - /// Hash of the output merkle root - pub output_merkle_root_hash: Hash, -} diff --git a/cmd/authority-claimer/src/rollups_events/rollups_stream.rs b/cmd/authority-claimer/src/rollups_events/rollups_stream.rs deleted file mode 100644 index 732fef5f6..000000000 --- a/cmd/authority-claimer/src/rollups_events/rollups_stream.rs +++ /dev/null @@ -1,64 +0,0 @@ -// (c) Cartesi and individual authors (see AUTHORS) -// SPDX-License-Identifier: Apache-2.0 (see LICENSE) - -use super::Address; -use clap::Parser; -use prometheus_client::encoding::EncodeLabelSet; -use serde_json::Value; -use std::{fs::File, io::BufReader}; - -/// DApp metadata used to define the stream keys -#[derive(Clone, Debug, Default, Hash, Eq, PartialEq, EncodeLabelSet)] -pub struct DAppMetadata { - pub chain_id: u64, - pub dapp_address: Address, -} - -/// CLI configuration used to generate the DApp metadata -#[derive(Debug, Parser)] -pub struct DAppMetadataCLIConfig { - /// Chain identifier - #[arg(long, env, default_value = "0")] - chain_id: u64, - - /// Address of rollups dapp - #[arg(long, env)] - dapp_contract_address: Option, - - /// Path to file with address of rollups dapp - #[arg(long, env)] - dapp_contract_address_file: Option, -} - -impl From for DAppMetadata { - fn from(cli_config: DAppMetadataCLIConfig) -> DAppMetadata { - let dapp_contract_address_raw = match cli_config.dapp_contract_address { - Some(address) => address, - None => { - let path = cli_config - .dapp_contract_address_file - .expect("Configuration missing dapp address"); - let file = File::open(path).expect("Dapp json read file error"); - let reader = BufReader::new(file); - let mut json: Value = serde_json::from_reader(reader) - .expect("Dapp json parse error"); - match json["address"].take() { - Value::String(s) => s, - Value::Null => panic!("Configuration missing dapp address"), - _ => panic!("Dapp json wrong type error"), - } - } - }; - - let dapp_contract_address: [u8; 20] = - hex::decode(&dapp_contract_address_raw[2..]) - .expect("Dapp json parse error") - .try_into() - .expect("Dapp address with wrong size"); - - DAppMetadata { - chain_id: cli_config.chain_id, - dapp_address: dapp_contract_address.into(), - } - } -} diff --git a/cmd/authority-claimer/src/sender.rs b/cmd/authority-claimer/src/sender.rs deleted file mode 100644 index f682737e5..000000000 --- a/cmd/authority-claimer/src/sender.rs +++ /dev/null @@ -1,231 +0,0 @@ -// (c) Cartesi and individual authors (see AUTHORS) -// SPDX-License-Identifier: Apache-2.0 (see LICENSE) - -use crate::{ - contracts::iconsensus::IConsensus, - metrics::AuthorityClaimerMetrics, - rollups_events::{Address, DAppMetadata, RollupsClaim}, - signer::ConditionalSigner, -}; -use async_trait::async_trait; -use eth_tx_manager::{ - config::TxManagerConfig, - database::FileSystemDatabase as Database, - gas_oracle::DefaultGasOracle as GasOracle, - manager::Configuration, - time::DefaultTime as Time, - transaction::{Priority, Transaction, Value}, - Chain, -}; -use ethers::{ - self, - middleware::SignerMiddleware, - providers::{Http, HttpRateLimitRetryPolicy, Provider, RetryClient}, - types::{NameOrAddress, H160, H256, U256}, -}; -use snafu::{OptionExt, ResultExt, Snafu}; -use std::{fmt::Debug, sync::Arc}; -use tracing::{debug, info, trace}; -use url::{ParseError, Url}; - -/// The `TransactionSender` sends claims to the blockchain. -/// -/// It should wait for N blockchain confirmations. -#[async_trait] -pub trait TransactionSender: Sized + Debug { - type Error: snafu::Error + 'static; - - /// The `send_rollups_claim_transaction` function consumes the - /// `TransactionSender` object and then returns it to avoid - /// that processes use the transaction sender concurrently. - async fn send_rollups_claim_transaction( - self, - rollups_claim: RollupsClaim, - iconsensus: Address, - ) -> Result<(H256, Self), Self::Error>; -} - -// ------------------------------------------------------------------------------------------------ -// DefaultTransactionSender -// ------------------------------------------------------------------------------------------------ - -type Middleware = - Arc>, ConditionalSigner>>; - -type TransactionManager = - eth_tx_manager::TransactionManager; - -type TransactionManagerError = - eth_tx_manager::Error; - -/// Instantiates the tx-manager calling `new` or `force_new`. -macro_rules! tx_manager { - ($new: ident, $middleware: expr, $database_path: expr, $chain: expr) => { - TransactionManager::$new( - $middleware.clone(), - GasOracle::new(), - Database::new($database_path.clone()), - $chain, - Configuration::default(), - ) - .await - }; -} - -#[derive(Debug)] -pub struct DefaultTransactionSender { - tx_manager: TransactionManager, - confirmations: usize, - priority: Priority, - from: ethers::types::Address, - chain_id: u64, - metrics: AuthorityClaimerMetrics, -} - -#[derive(Debug, Snafu)] -pub enum TransactionSenderError { - #[snafu(display("Invalid provider URL"))] - ProviderUrl { source: ParseError }, - - #[snafu(display("Transaction manager error"))] - TransactionManager { source: TransactionManagerError }, - - #[snafu(display("Internal ethers-rs error: tx `to` should not be null"))] - InternalEthers, - - #[snafu(display( - "Internal configuration error: expected address, found ENS name" - ))] - InternalConfig, -} - -/// Creates the (layered) middleware instance to be sent to the tx-manager. -fn create_middleware( - conditional_signer: ConditionalSigner, - provider_url: String, -) -> Result { - const MAX_RETRIES: u32 = 10; - const INITIAL_BACKOFF: u64 = 1000; - let url = Url::parse(&provider_url).context(ProviderUrlSnafu)?; - let base_layer = Http::new(url); - let retry_layer = Provider::new(RetryClient::new( - base_layer, - Box::new(HttpRateLimitRetryPolicy), - MAX_RETRIES, - INITIAL_BACKOFF, - )); - let signer_layer = SignerMiddleware::new(retry_layer, conditional_signer); - Ok(Arc::new(signer_layer)) -} - -/// Creates the tx-manager instance. -/// NOTE: tries to re-instantiate the tx-manager only once. -async fn create_tx_manager( - conditional_signer: ConditionalSigner, - provider_url: String, - database_path: String, - chain: Chain, -) -> Result { - let middleware = create_middleware(conditional_signer, provider_url)?; - let result = tx_manager!(new, middleware, database_path, chain); - let tx_manager = - if let Err(TransactionManagerError::NonceTooLow { .. }) = result { - info!("Nonce too low! Clearing the tx-manager database."); - tx_manager!(force_new, middleware, database_path, chain) - .context(TransactionManagerSnafu)? - } else { - let (tx_manager, receipt) = - result.context(TransactionManagerSnafu)?; - trace!("Database claim transaction confirmed: `{:?}`", receipt); - tx_manager - }; - Ok(tx_manager) -} - -impl DefaultTransactionSender { - pub async fn new( - tx_manager_config: TxManagerConfig, - tx_manager_priority: Priority, - conditional_signer: ConditionalSigner, - from: ethers::types::Address, - chain_id: u64, - metrics: AuthorityClaimerMetrics, - ) -> Result { - let chain: Chain = (&tx_manager_config).into(); - - let tx_manager = create_tx_manager( - conditional_signer, - tx_manager_config.provider_http_endpoint.clone(), - tx_manager_config.database_path.clone(), - chain, - ) - .await?; - - Ok(Self { - tx_manager, - confirmations: tx_manager_config.default_confirmations, - priority: tx_manager_priority, - from, - chain_id, - metrics, - }) - } -} - -#[async_trait] -impl TransactionSender for DefaultTransactionSender { - type Error = TransactionSenderError; - - async fn send_rollups_claim_transaction( - self, - rollups_claim: RollupsClaim, - iconsensus: Address, - ) -> Result<(H256, Self), Self::Error> { - let dapp_address = rollups_claim.dapp_address.clone(); - - let iconsensus = { - let (provider, _mock) = Provider::mocked(); - let provider = Arc::new(provider); - let address: H160 = iconsensus.into_inner().into(); - IConsensus::new(address, provider) - }; - - let transaction = { - let call = iconsensus - .submit_claim( - H160(dapp_address.inner().to_owned()), - U256([rollups_claim.last_block, 0, 0, 0]), - rollups_claim.output_merkle_root_hash.into_inner(), - ) - .from(self.from); - let to = match call.tx.to().context(InternalEthersSnafu)? { - NameOrAddress::Address(a) => *a, - _ => return Err(TransactionSenderError::InternalConfig), - }; - Transaction { - from: self.from, - to, - value: Value::Nothing, - call_data: call.tx.data().cloned(), - } - }; - - trace!("Built claim transaction: `{:?}`", transaction); - - let (tx_manager, receipt) = self - .tx_manager - .send_transaction(transaction, self.confirmations, self.priority) - .await - .context(TransactionManagerSnafu)?; - self.metrics - .claims_sent - .get_or_create(&DAppMetadata { - chain_id: self.chain_id, - dapp_address, - }) - .inc(); - debug!("Claim transaction confirmed: `{:?}`", receipt); - - Ok((receipt.transaction_hash, Self { tx_manager, ..self })) - } -} diff --git a/cmd/authority-claimer/src/signer/aws_credentials.rs b/cmd/authority-claimer/src/signer/aws_credentials.rs deleted file mode 100644 index 90ec444f0..000000000 --- a/cmd/authority-claimer/src/signer/aws_credentials.rs +++ /dev/null @@ -1,143 +0,0 @@ -// (c) Cartesi and individual authors (see AUTHORS) -// SPDX-License-Identifier: Apache-2.0 (see LICENSE) - -use async_trait::async_trait; -use rusoto_core::credential::{ - AwsCredentials, CredentialsError, DefaultCredentialsProvider, - ProvideAwsCredentials, -}; -use rusoto_sts::WebIdentityProvider; -use std::env; - -pub const WEB_IDENTITY_ENV_VARS: [&str; 3] = [ - "AWS_WEB_IDENTITY_TOKEN_FILE", - "AWS_ROLE_ARN", - "AWS_ROLE_SESSION_NAME", -]; - -/// The `AwsCredentialsProvider` wraps around a `ProvideAwsCredentials` -/// trait object, and reimplements the `ProvideAwsCredentials` trait. -/// -/// The underlying implementation can be either a `rusoto_sts::WebIdentityProvider` -/// or a `rusoto_core::credential::DefaultCredentialsProvider`. It prioritizes -/// instantiating a `WebIdentityProvider` if the correct environment variables -/// are set. -pub struct AwsCredentialsProvider(Box); - -#[async_trait] -impl ProvideAwsCredentials for AwsCredentialsProvider { - async fn credentials(&self) -> Result { - self.0.credentials().await - } -} - -impl AwsCredentialsProvider { - pub fn new() -> Result { - for env_var in WEB_IDENTITY_ENV_VARS { - if env::var(env_var).is_err() { - tracing::trace!("ENV VAR {} is not set", env_var); - tracing::trace!("instantiating default provider"); - return DefaultCredentialsProvider::new() - .map(Box::new) - .map(|inner| Self(inner)); - } - } - tracing::trace!("instantiating web identity provider"); - Ok(Self(Box::new(WebIdentityProvider::from_k8s_env()))) - } -} - -#[cfg(test)] -pub mod tests { - use serial_test::serial; - use std::env; - use tracing_test::traced_test; - - use crate::signer::aws_credentials::{ - AwsCredentialsProvider, WEB_IDENTITY_ENV_VARS, - }; - - // -------------------------------------------------------------------------------------------- - // new - // These and any other tests that use credential vars are #[serial] - // because there might be ENV VAR concurrency issues if they run - // in parallel. - // -------------------------------------------------------------------------------------------- - - #[test] - #[serial] - #[traced_test] - fn new_default_provider_when_one_web_identity_var_is_missing() { - for i in 0..3 { - clean_web_identity_vars(); - set_web_identity_vars(); - remove_web_identity_var(i); - let result = AwsCredentialsProvider::new(); - assert!(result.is_ok()); - assert!(!logs_contain("instantiating web identity provider")); - assert!(logs_contain("instantiating default provider")); - } - } - - #[test] - #[serial] - #[traced_test] - fn new_default_provider_when_two_web_identity_vars_are_missing() { - for i in 0..3 { - clean_web_identity_vars(); - set_web_identity_var(i); - let result = AwsCredentialsProvider::new(); - assert!(result.is_ok()); - assert!(!logs_contain("instantiating web identity provider")); - assert!(logs_contain("instantiating default provider")); - } - } - - #[test] - #[serial] - #[traced_test] - fn new_default_provider_when_all_web_identity_vars_are_missing() { - clean_web_identity_vars(); - let result = AwsCredentialsProvider::new(); - assert!(result.is_ok()); - assert!(!logs_contain("instantiating web identity provider")); - assert!(logs_contain("instantiating default provider")); - } - - #[test] - #[serial] - #[traced_test] - fn new_web_identity_provider_when_no_web_identity_vars_are_missing() { - clean_web_identity_vars(); - set_web_identity_vars(); - let result = AwsCredentialsProvider::new(); - assert!(result.is_ok()); - assert!(!logs_contain("instantiating default provider")); - assert!(logs_contain("instantiating web identity provider")); - } - - // -------------------------------------------------------------------------------------------- - // env vars - // Used by other tests. - // -------------------------------------------------------------------------------------------- - - fn set_web_identity_var(i: usize) { - env::set_var(WEB_IDENTITY_ENV_VARS[i], "irrelevant"); - } - - fn set_web_identity_vars() { - for env_var in WEB_IDENTITY_ENV_VARS { - env::set_var(env_var, "irrelevant"); - } - } - - fn clean_web_identity_vars() { - for env_var in WEB_IDENTITY_ENV_VARS { - env::remove_var(env_var); - } - } - - fn remove_web_identity_var(i: usize) { - env::remove_var(WEB_IDENTITY_ENV_VARS[i]); - } -} diff --git a/cmd/authority-claimer/src/signer/aws_signer.rs b/cmd/authority-claimer/src/signer/aws_signer.rs deleted file mode 100644 index 49982a538..000000000 --- a/cmd/authority-claimer/src/signer/aws_signer.rs +++ /dev/null @@ -1,131 +0,0 @@ -// (c) Cartesi and individual authors (see AUTHORS) -// SPDX-License-Identifier: Apache-2.0 (see LICENSE) - -use super::aws_credentials::AwsCredentialsProvider; -use async_trait::async_trait; -use ethers::{ - signers::{AwsSigner as InnerAwsSigner, AwsSignerError, Signer}, - types::{ - transaction::{eip2718::TypedTransaction, eip712::Eip712}, - Address, Signature, - }, -}; -use rusoto_core::{HttpClient, Region}; -use rusoto_kms::KmsClient; - -/// The `AwsSigner` (re)implements the `Signer` trait for the `InnerAwsSigner`. -/// -/// We do not use an `InnerAwsSigner` directly because of lifetime and -/// borrow restrictions imposed by the underlying libraries. -/// -/// Instead, we instantiate a new `InnerAwsSigner` every time we call -/// a function from `Signer`. -#[derive(Debug, Clone)] -pub struct AwsSigner { - region: Region, - key_id: String, - chain_id: u64, - address: Address, -} - -/// Creates a `KmsClient` instance. -fn create_kms(region: &Region) -> KmsClient { - let request_dispatcher = HttpClient::new().expect("http client TLS error"); - let region = region.clone(); - let credentials_provider = AwsCredentialsProvider::new() - .expect("could not instantiate AWS credentials provider"); - KmsClient::new_with(request_dispatcher, credentials_provider, region) -} - -impl AwsSigner { - pub async fn new( - key_id: String, - chain_id: u64, - region: Region, - ) -> Result { - let kms = create_kms(®ion); - let aws_signer = - InnerAwsSigner::new(&kms, key_id.clone(), chain_id).await?; - Ok(Self { - region, - key_id, - chain_id, - address: aws_signer.address(), - }) - } -} - -/// Calls the async `$method` from an `InnerAwsSigner` instance. -/// Reinstantiates the `InnerAwsSigner`. -macro_rules! inner_aws_signer_call { - ($aws_signer: expr, - $method: ident, - $argument: expr) => { - InnerAwsSigner::new( - &create_kms(&$aws_signer.region), - &$aws_signer.key_id.clone(), - $aws_signer.chain_id, - ) - .await? - .$method($argument) - .await - }; -} - -#[async_trait] -impl Signer for AwsSigner { - type Error = AwsSignerError; - - async fn sign_message>( - &self, - message: S, - ) -> Result { - inner_aws_signer_call!(self, sign_message, message) - } - - async fn sign_transaction( - &self, - message: &TypedTransaction, - ) -> Result { - inner_aws_signer_call!(self, sign_transaction, message) - } - - async fn sign_typed_data( - &self, - payload: &T, - ) -> Result { - inner_aws_signer_call!(self, sign_typed_data, payload) - } - - fn address(&self) -> Address { - self.address - } - - fn chain_id(&self) -> u64 { - self.chain_id - } - - fn with_chain_id>(self, chain_id: T) -> Self { - Self { - key_id: self.key_id.clone(), - chain_id: chain_id.into(), - region: self.region.clone(), - address: self.address, - } - } -} - -#[cfg(test)] -pub mod tests { - use rusoto_core::Region; - - use crate::signer::aws_signer::AwsSigner; - - #[tokio::test] - async fn new_aws_signer_with_error() { - let invalid_key_id = "invalid".to_string(); - let aws_signer = - AwsSigner::new(invalid_key_id, 0, Region::UsEast1).await; - assert!(aws_signer.is_err()); - } -} diff --git a/cmd/authority-claimer/src/signer/mod.rs b/cmd/authority-claimer/src/signer/mod.rs deleted file mode 100644 index 57036b42a..000000000 --- a/cmd/authority-claimer/src/signer/mod.rs +++ /dev/null @@ -1,8 +0,0 @@ -// (c) Cartesi and individual authors (see AUTHORS) -// SPDX-License-Identifier: Apache-2.0 (see LICENSE) - -mod aws_credentials; -mod aws_signer; -mod signer; - -pub use signer::ConditionalSigner; diff --git a/cmd/authority-claimer/src/signer/signer.rs b/cmd/authority-claimer/src/signer/signer.rs deleted file mode 100644 index 9a1cb5587..000000000 --- a/cmd/authority-claimer/src/signer/signer.rs +++ /dev/null @@ -1,259 +0,0 @@ -// (c) Cartesi and individual authors (see AUTHORS) -// SPDX-License-Identifier: Apache-2.0 (see LICENSE) - -use crate::{config::TxSigningConfig, signer::aws_signer::AwsSigner}; -use async_trait::async_trait; -use ethers::{ - signers::{ - coins_bip39::English, AwsSignerError, LocalWallet, MnemonicBuilder, - Signer, WalletError, - }, - types::{ - transaction::{eip2718::TypedTransaction, eip712::Eip712}, - Address, Signature, - }, -}; -use snafu::{ResultExt, Snafu}; - -/// The `ConditionalSigner` is implementing conditional dispatch (instead of -/// dynamic dispatch) by hand for objects that implement the `Sender` trait. -/// -/// We had to do this because (1) we cannot create a `Box` and -/// (2) using parametric types would move this complexity to the main loop, -/// which is undesirable. -#[derive(Debug, Clone)] -pub enum ConditionalSigner { - LocalWallet(LocalWallet), - AwsSigner(AwsSigner), -} - -#[derive(Debug, Snafu)] -pub enum ConditionalSignerError { - #[snafu(display("Local wallet signer error"))] - LocalWallet { source: WalletError }, - - #[snafu(display("AWS KMS signer error"))] - AwsSigner { source: AwsSignerError }, -} - -impl ConditionalSigner { - pub async fn new( - chain_id: u64, - tx_signing_config: &TxSigningConfig, - ) -> Result { - match tx_signing_config.clone() { - TxSigningConfig::PrivateKey { private_key } => { - let wallet = private_key - .inner() - .as_str() - .parse::() - .context(LocalWalletSnafu)? - .with_chain_id(chain_id); - Ok(ConditionalSigner::LocalWallet(wallet)) - } - TxSigningConfig::Mnemonic { - mnemonic, - account_index, - } => { - const DEFAULT_ACCOUNT_INDEX: u32 = 0; - let index = account_index.unwrap_or(DEFAULT_ACCOUNT_INDEX); - let wallet = MnemonicBuilder::::default() - .phrase(mnemonic.inner().as_str()) - .index(index) - .context(LocalWalletSnafu)? - .build() - .context(LocalWalletSnafu)? - .with_chain_id(chain_id); - Ok(ConditionalSigner::LocalWallet(wallet)) - } - TxSigningConfig::Aws { key_id, region } => { - AwsSigner::new(key_id, chain_id, region) - .await - .map(ConditionalSigner::AwsSigner) - .context(AwsSignerSnafu) - } - } - } -} - -#[async_trait] -impl Signer for ConditionalSigner { - type Error = ConditionalSignerError; - - async fn sign_message>( - &self, - message: S, - ) -> Result { - match &self { - Self::LocalWallet(local_wallet) => local_wallet - .sign_message(message) - .await - .context(LocalWalletSnafu), - Self::AwsSigner(aws_signer) => aws_signer - .sign_message(message) - .await - .context(AwsSignerSnafu), - } - } - - async fn sign_transaction( - &self, - message: &TypedTransaction, - ) -> Result { - match &self { - Self::LocalWallet(local_wallet) => local_wallet - .sign_transaction(message) - .await - .context(LocalWalletSnafu), - Self::AwsSigner(aws_signer) => aws_signer - .sign_transaction(message) - .await - .context(AwsSignerSnafu), - } - } - - async fn sign_typed_data( - &self, - payload: &T, - ) -> Result { - match &self { - Self::LocalWallet(local_wallet) => local_wallet - .sign_typed_data(payload) - .await - .context(LocalWalletSnafu), - Self::AwsSigner(aws_signer) => aws_signer - .sign_typed_data(payload) - .await - .context(AwsSignerSnafu), - } - } - - fn address(&self) -> Address { - match &self { - Self::LocalWallet(local_wallet) => local_wallet.address(), - Self::AwsSigner(aws_signer) => aws_signer.address(), - } - } - - fn chain_id(&self) -> u64 { - match &self { - Self::LocalWallet(local_wallet) => local_wallet.chain_id(), - Self::AwsSigner(aws_signer) => aws_signer.chain_id(), - } - } - - fn with_chain_id>(self, chain_id: T) -> Self { - match &self { - Self::LocalWallet(local_wallet) => { - Self::LocalWallet(local_wallet.clone().with_chain_id(chain_id)) - } - Self::AwsSigner(aws_signer) => { - Self::AwsSigner(aws_signer.clone().with_chain_id(chain_id)) - } - } - } -} - -#[cfg(test)] -mod tests { - use crate::redacted::Redacted; - use crate::{config::TxSigningConfig, signer::ConditionalSigner}; - use ethers::types::{ - transaction::{eip2718::TypedTransaction, eip2930::AccessList}, - Address, Eip1559TransactionRequest, - }; - use ethers_signers::Signer; - - // -------------------------------------------------------------------------------------------- - // new - // -------------------------------------------------------------------------------------------- - - #[tokio::test] - async fn new_local_wallet_mnemonic_conditional_signer() { - let conditional_signer = - local_wallet_mnemonic_conditional_signer().await; - assert!(matches!( - conditional_signer, - ConditionalSigner::LocalWallet(_) - )); - } - - #[tokio::test] - async fn new_local_wallet_private_key_conditional_signer() { - let conditional_signer = - local_wallet_private_key_conditional_signer().await; - assert!(matches!( - conditional_signer, - ConditionalSigner::LocalWallet(_) - )); - } - - // -------------------------------------------------------------------------------------------- - // sign_transaction - // -------------------------------------------------------------------------------------------- - - #[tokio::test] - async fn sign_transaction_with_mnemonic_local_wallet_conditional_signer() { - let conditional_signer = - local_wallet_mnemonic_conditional_signer().await; - let message = eip1559_message(); - let result = conditional_signer.sign_transaction(&message).await; - assert!(result.is_ok()); - } - - #[tokio::test] - async fn sign_transaction_with_private_key_local_wallet_conditional_signer() - { - let conditional_signer = - local_wallet_private_key_conditional_signer().await; - let message = eip1559_message(); - let result = conditional_signer.sign_transaction(&message).await; - assert!(result.is_ok()); - } - - // -------------------------------------------------------------------------------------------- - // auxiliary - // -------------------------------------------------------------------------------------------- - - const CHAIN_ID: u64 = 1; - const PRIVATE_KEY: &str = - "59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d"; - const MNEMONIC: &str = - "indoor dish desk flag debris potato excuse depart ticket judge file exit"; - - async fn local_wallet_mnemonic_conditional_signer() -> ConditionalSigner { - let tx_signing_config = TxSigningConfig::Mnemonic { - mnemonic: Redacted::new(MNEMONIC.to_string()), - account_index: Some(1), - }; - ConditionalSigner::new(CHAIN_ID, &tx_signing_config) - .await - .unwrap() - } - - async fn local_wallet_private_key_conditional_signer() -> ConditionalSigner - { - let tx_signing_config = TxSigningConfig::PrivateKey { - private_key: Redacted::new(PRIVATE_KEY.to_string()), - }; - ConditionalSigner::new(CHAIN_ID, &tx_signing_config) - .await - .unwrap() - } - - fn eip1559_message() -> TypedTransaction { - TypedTransaction::Eip1559( - Eip1559TransactionRequest::new() - .from(Address::default()) - .to(Address::default()) - .gas(555) - .value(1337) - .data(vec![1, 2, 3]) - .nonce(1) - .access_list(AccessList::default()) - .max_priority_fee_per_gas(10) - .max_fee_per_gas(20) - .chain_id(CHAIN_ID), - ) - } -} diff --git a/internal/claimer/side-effects.go b/internal/claimer/side-effects.go index 8c7ab1bc1..a01b2bf07 100644 --- a/internal/claimer/side-effects.go +++ b/internal/claimer/side-effects.go @@ -101,7 +101,7 @@ func (s *Service) updateEpochWithSubmittedClaim( claim *ComputedClaim, txHash Hash, ) error { - _, err := DBConn.UpdateEpochWithSubmittedClaim(context, claim.EpochID, txHash) + err := DBConn.UpdateEpochWithSubmittedClaim(context, claim.EpochID, txHash) if err != nil { s.Logger.Error("UpdateEpochWithSubmittedClaim:failed", "service", s.Name, diff --git a/internal/node/services.go b/internal/node/services.go index 4d98ed26d..5bdc0ea94 100644 --- a/internal/node/services.go +++ b/internal/node/services.go @@ -7,7 +7,6 @@ import ( "fmt" "log/slog" "net/http" - "os" advancerservice "github.com/cartesi/rollups-node/internal/advancer/service" claimerservice "github.com/cartesi/rollups-node/internal/claimer" @@ -24,7 +23,6 @@ type portOffset = int const ( portOffsetProxy = iota - portOffsetAuthorityClaimer ) // Get the port of the given service. @@ -32,65 +30,6 @@ func getPort(c config.NodeConfig, offset portOffset) int { return c.HttpPort + int(offset) } -// Create the RUST_LOG variable using the config log level. -// If the log level is set to debug, set tracing log for the given rust module. -func getRustLog(c config.NodeConfig, rustModule string) string { - switch c.LogLevel { - case slog.LevelDebug: - return fmt.Sprintf("RUST_LOG=info,%v=trace", rustModule) - case slog.LevelInfo: - return "RUST_LOG=info" - case slog.LevelWarn: - return "RUST_LOG=warn" - case slog.LevelError: - return "RUST_LOG=error" - default: - panic("impossible") - } -} - -func newAuthorityClaimer(c config.NodeConfig, workDir string) services.Service { - var s services.CommandService - s.Name = "authority-claimer" - s.HealthcheckPort = getPort(c, portOffsetAuthorityClaimer) - s.Path = "cartesi-rollups-authority-claimer" - s.Env = append(s.Env, "LOG_ENABLE_TIMESTAMP=false") - s.Env = append(s.Env, "LOG_ENABLE_COLOR=false") - s.Env = append(s.Env, getRustLog(c, "authority_claimer")) - s.Env = append(s.Env, fmt.Sprintf("TX_PROVIDER_HTTP_ENDPOINT=%v", - c.BlockchainHttpEndpoint.Value)) - s.Env = append(s.Env, fmt.Sprintf("TX_CHAIN_ID=%v", c.BlockchainID)) - s.Env = append(s.Env, fmt.Sprintf("TX_CHAIN_IS_LEGACY=%v", c.LegacyBlockchainEnabled)) - s.Env = append(s.Env, fmt.Sprintf("TX_DEFAULT_CONFIRMATIONS=%v", - c.BlockchainFinalityOffset)) - s.Env = append(s.Env, fmt.Sprintf("POSTGRES_ENDPOINT=%v", - fmt.Sprintf("%v", c.PostgresEndpoint.Value))) - s.Env = append(s.Env, fmt.Sprintf("POLLING_INTERVAL=%v", c.ClaimerPollingInterval.Seconds())) - s.Env = append(s.Env, fmt.Sprintf("INPUT_BOX_ADDRESS=%v", c.ContractsInputBoxAddress)) - s.Env = append(s.Env, fmt.Sprintf("GENESIS_BLOCK=%v", - c.ContractsInputBoxDeploymentBlockNumber)) - s.Env = append(s.Env, fmt.Sprintf("HTTP_SERVER_PORT=%v", - getPort(c, portOffsetAuthorityClaimer))) - switch auth := c.Auth.(type) { - case config.AuthPrivateKey: - s.Env = append(s.Env, fmt.Sprintf("TX_SIGNING_PRIVATE_KEY=%v", - auth.PrivateKey.Value)) - case config.AuthMnemonic: - s.Env = append(s.Env, fmt.Sprintf("TX_SIGNING_MNEMONIC=%v", auth.Mnemonic.Value)) - s.Env = append(s.Env, fmt.Sprintf("TX_SIGNING_MNEMONIC_ACCOUNT_INDEX=%v", - auth.AccountIndex.Value)) - case config.AuthAWS: - s.Env = append(s.Env, fmt.Sprintf("TX_SIGNING_AWS_KMS_KEY_ID=%v", auth.KeyID.Value)) - s.Env = append(s.Env, fmt.Sprintf("TX_SIGNING_AWS_KMS_REGION=%v", - auth.Region.Value)) - default: - panic("invalid auth config") - } - s.Env = append(s.Env, os.Environ()...) - s.WorkDir = workDir - return &s -} - func newSupervisorService( c config.NodeConfig, workDir string, @@ -99,7 +38,7 @@ func newSupervisorService( var s []services.Service if c.FeatureClaimerEnabled { - s = append(s, newAuthorityClaimer(c, workDir)) + s = append(s, newClaimerService(c, database)) } serveMux := http.NewServeMux() @@ -109,7 +48,6 @@ func newSupervisorService( s = append(s, newAdvancerService(c, database, serveMux)) s = append(s, newValidatorService(c, database)) s = append(s, newHttpService(c, serveMux)) - s = append(s, newClaimerService(c, database)) supervisor := services.SupervisorService{ Name: "rollups-node", diff --git a/internal/repository/claimer.go b/internal/repository/claimer.go index 093f3c668..ccf99b836 100644 --- a/internal/repository/claimer.go +++ b/internal/repository/claimer.go @@ -5,12 +5,16 @@ package repository import ( "context" + "fmt" . "github.com/cartesi/rollups-node/internal/model" "github.com/ethereum/go-ethereum/common" "github.com/jackc/pgx/v5" - "github.com/jackc/pgx/v5/pgconn" +) + +var ( + ErrNoUpdate = fmt.Errorf("update did not take effect") ) type ComputedClaim struct { @@ -69,7 +73,7 @@ func (pg *Database) UpdateEpochWithSubmittedClaim( ctx context.Context, id uint64, transaction_hash common.Hash, -) (pgconn.CommandTag, error) { +) error { query := ` UPDATE epoch @@ -85,5 +89,13 @@ func (pg *Database) UpdateEpochWithSubmittedClaim( "status": EpochStatusClaimSubmitted, "prevStatus": EpochStatusClaimComputed, } - return pg.db.Exec(ctx, query, args) + tag, err := pg.db.Exec(ctx, query, args) + + if err != nil { + return err + } + if tag.RowsAffected() == 0 { + return ErrNoUpdate + } + return nil } diff --git a/internal/repository/claimer_test.go b/internal/repository/claimer_test.go new file mode 100644 index 000000000..c907582f6 --- /dev/null +++ b/internal/repository/claimer_test.go @@ -0,0 +1,167 @@ +// (c) Cartesi and individual authors (see AUTHORS) +// SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +package repository + +import ( + "context" + "testing" + . "github.com/cartesi/rollups-node/internal/model" + "github.com/ethereum/go-ethereum/common" + "github.com/cartesi/rollups-node/test/tooling/db" + "github.com/stretchr/testify/require" +) + +func setup(t *testing.T, ctx context.Context) (*require.Assertions, *Database, error) { + require := require.New(t) + + var err error + endpoint, err := db.GetPostgresTestEndpoint() + require.Nil(err) + + err = db.SetupTestPostgres(endpoint) + require.Nil(err) + + database, err := Connect(ctx, endpoint) + require.Nil(err) + require.NotNil(database) + + return require, database, nil +} + +func TestClaimerRepository(t *testing.T) { + ctx := context.Background() + + t.Run("EmptySelectComputedClaims", func(t *testing.T) { + require, database, err := setup(t, ctx) + + computedClaims, err := database.SelectComputedClaims(ctx) + require.Nil(err) + require.Empty(computedClaims) + }) + + t.Run("SelectComputedClaims", func(t *testing.T) { + require, database, err := setup(t, ctx) + + app := Application{ + Id: 1, + ContractAddress: common.HexToAddress("deadbeef"), + TemplateHash: common.HexToHash("deadbeef"), + LastProcessedBlock: 1, + Status: ApplicationStatusRunning, + IConsensusAddress: common.HexToAddress("ffffff"), + } + _, err = database.InsertApplication(ctx, &app) + require.Nil(err) + + lastBlock := []uint64{99, 200} + epochs := []Epoch{ + { + Id: 1, + Index: 0, + FirstBlock: 0, + LastBlock: lastBlock[0], + AppAddress: app.ContractAddress, + ClaimHash: &common.Hash{}, + TransactionHash: nil, + Status: EpochStatusClaimComputed, + },{ + Id: 2, + Index: 1, + FirstBlock: lastBlock[0]+1, + LastBlock: lastBlock[1], + AppAddress: app.ContractAddress, + ClaimHash: &common.Hash{}, + TransactionHash: nil, + Status: EpochStatusClaimComputed, + }, + } + + for _, epoch := range(epochs) { + _, err = database.InsertEpoch(ctx, &epoch) + require.Nil(err) + } + + computedClaims, err := database.SelectComputedClaims(ctx) + require.Nil(err) + require.Len(computedClaims, 2) + + for i, computedClaim := range(computedClaims) { + require.Equal(computedClaim.EpochID, epochs[i].Id) + require.Equal(computedClaim.Hash, *epochs[i].ClaimHash) + require.Equal(computedClaim.AppContractAddress, app.ContractAddress) + require.Equal(computedClaim.AppIConsensusAddress, app.IConsensusAddress) + } + }) + + t.Run("TestUpdateEpochWithSubmittedClaim", func(t *testing.T) { + require, database, err := setup(t, ctx) + + app := Application{ + Id: 1, + ContractAddress: common.HexToAddress("deadbeef"), + TemplateHash: common.HexToHash("deadbeef"), + LastProcessedBlock: 1, + Status: ApplicationStatusRunning, + IConsensusAddress: common.HexToAddress("ffffff"), + } + _, err = database.InsertApplication(ctx, &app) + require.Nil(err) + + epoch := Epoch{ + Id: 1, + Index: 0, + FirstBlock: 0, + LastBlock: 100, + AppAddress: app.ContractAddress, + ClaimHash: &common.Hash{}, + TransactionHash: nil, + Status: EpochStatusClaimComputed, + } + + id, err := database.InsertEpoch(ctx, &epoch) + require.Nil(err) + + transactionHash := common.HexToHash("0x10") + err = database.UpdateEpochWithSubmittedClaim(ctx, id, transactionHash) + require.Nil(err) + + updatedEpoch, err := database.GetEpoch(ctx, epoch.Index, epoch.AppAddress) + require.Nil(err) + require.Equal(updatedEpoch.Status, EpochStatusClaimSubmitted) + require.Equal(updatedEpoch.TransactionHash, &transactionHash) + }) + + t.Run("TestFailUpdateEpochWithSubmittedClaim", func(t *testing.T) { + require, database, err := setup(t, ctx) + + app := Application{ + Id: 1, + ContractAddress: common.HexToAddress("deadbeef"), + TemplateHash: common.HexToHash("deadbeef"), + LastProcessedBlock: 1, + Status: ApplicationStatusRunning, + IConsensusAddress: common.HexToAddress("ffffff"), + } + _, err = database.InsertApplication(ctx, &app) + require.Nil(err) + + transactionHash := common.HexToHash("0x10") + epoch := Epoch{ + Id: 1, + Index: 0, + FirstBlock: 0, + LastBlock: 100, + AppAddress: app.ContractAddress, + ClaimHash: &common.Hash{}, + TransactionHash: &transactionHash, + Status: EpochStatusClaimSubmitted, + } + + id, err := database.InsertEpoch(ctx, &epoch) + require.Nil(err) + + err = database.UpdateEpochWithSubmittedClaim(ctx, id, transactionHash) + require.Equal(err, ErrNoUpdate) + }) +} diff --git a/internal/services/command.go b/internal/services/command.go deleted file mode 100644 index b3112cda1..000000000 --- a/internal/services/command.go +++ /dev/null @@ -1,136 +0,0 @@ -// (c) Cartesi and individual authors (see AUTHORS) -// SPDX-License-Identifier: Apache-2.0 (see LICENSE) - -package services - -import ( - "context" - "fmt" - "log/slog" - "net" - "os/exec" - "regexp" - "strings" - "syscall" - "time" - - "github.com/cartesi/rollups-node/internal/services/linewriter" -) - -const ( - DefaultPollInterval = 100 * time.Millisecond -) - -// CommandService encapsulates the execution of an executable via exec.Command. -// It assumes the executable accepts TCP connections at HealthcheckPort, -// which it uses to determine if it is ready or not. -type CommandService struct { - - // Name that identifies the service. - Name string - - // Port used to verify if the service is ready. - HealthcheckPort int - - // Path to the service binary. - Path string - - // Args to the service binary. - Args []string - - // Environment variables. - Env []string - - // Working Directory - WorkDir string -} - -func (s *CommandService) Start(ctx context.Context, ready chan<- struct{}) error { - cmd := exec.CommandContext(ctx, s.Path, s.Args...) - cmd.Env = s.Env - cmd.Stderr = linewriter.New(&commandLogger{s.Name}) - cmd.Stdout = linewriter.New(&commandLogger{s.Name}) - cmd.Cancel = func() error { - err := cmd.Process.Signal(syscall.SIGTERM) - if err != nil { - slog.Warn("Failed to send SIGTERM", "service", s, "error", err) - } - return err - } - - if s.WorkDir != "" { - cmd.Dir = s.WorkDir - } - - go s.pollTcp(ctx, ready) - err := cmd.Run() - - if ctx.Err() != nil { - return ctx.Err() - } - return err -} - -// Blocks until the service is ready or the context is canceled. -func (s *CommandService) pollTcp(ctx context.Context, ready chan<- struct{}) { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - for { - conn, err := net.Dial("tcp", fmt.Sprintf("0.0.0.0:%v", s.HealthcheckPort)) - if err == nil { - slog.Debug("Service is ready", "service", s) - conn.Close() - ready <- struct{}{} - return - } - select { - case <-ctx.Done(): - return - case <-time.After(DefaultPollInterval): - } - } -} - -func (s *CommandService) String() string { - return s.Name -} - -// A wrapper around slog.Default that writes log output from a services.CommandService -// to the correct log level -type commandLogger struct { - Name string -} - -func (l *commandLogger) Write(data []byte) (int, error) { - // If data does has no alphanumeric characters, ignore it. - if match := alphanumericRegex.Find(data); match == nil { - return 0, nil - } - msg := strings.TrimSpace(string(data)) - level := logLevelForMessage(msg) - slog.Log(context.Background(), level, msg, "service", l.Name) - return len(msg), nil -} - -var ( - errorRegex = regexp.MustCompile(`(?i)(error|fatal)`) - warnRegex = regexp.MustCompile(`(?i)warn`) - infoRegex = regexp.MustCompile(`(?i)info`) - debugRegex = regexp.MustCompile(`(?i)(debug|trace)`) - alphanumericRegex = regexp.MustCompile("[a-zA-Z0-9]") -) - -// Uses regular expressions to determine the correct log level. If there is no match, -// returns slog.LevelInfo -func logLevelForMessage(msg string) slog.Level { - if match := infoRegex.FindString(msg); len(match) > 0 { - return slog.LevelInfo - } else if match = debugRegex.FindString(msg); len(match) > 0 { - return slog.LevelDebug - } else if match = warnRegex.FindString(msg); len(match) > 0 { - return slog.LevelWarn - } else if match = errorRegex.FindString(msg); len(match) > 0 { - return slog.LevelError - } - return slog.LevelInfo -} diff --git a/internal/services/command_test.go b/internal/services/command_test.go deleted file mode 100644 index 767153744..000000000 --- a/internal/services/command_test.go +++ /dev/null @@ -1,142 +0,0 @@ -// (c) Cartesi and individual authors (see AUTHORS) -// SPDX-License-Identifier: Apache-2.0 (see LICENSE) - -package services - -import ( - "context" - "fmt" - "os" - "os/exec" - "path/filepath" - "testing" - "time" - - "github.com/stretchr/testify/suite" -) - -type CommandServiceSuite struct { - suite.Suite - tmpDir string - servicePort int -} - -func (s *CommandServiceSuite) SetupSuite() { - s.buildFakeService() - s.servicePort = 55555 -} - -func (s *CommandServiceSuite) TearDownSuite() { - err := os.RemoveAll(s.tmpDir) - if err != nil { - panic(err) - } -} - -func (s *CommandServiceSuite) SetupTest() { - s.servicePort++ - serviceAddress := "0.0.0.0:" + fmt.Sprint(s.servicePort) - os.Setenv("SERVICE_ADDRESS", serviceAddress) -} - -func (s *CommandServiceSuite) TestItStopsWhenContextIsCancelled() { - service := CommandService{ - Name: "fake-service", - Path: "fake-service", - HealthcheckPort: s.servicePort, - } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - // start service in goroutine - result := make(chan error) - ready := make(chan struct{}) - go func() { - result <- service.Start(ctx, ready) - }() - - // assert service started successfully - select { - case err := <-result: - s.FailNow("service failed to start", err) - case <-ready: - } - - cancel() - - err := <-result - s.ErrorIs(err, context.Canceled, "service exited for the wrong reason: %v", err) -} - -// Service should stop if timeout is reached and it isn't ready yet -func (s *CommandServiceSuite) TestItTimesOut() { - service := CommandService{ - Name: "fake-service", - Path: "fake-service", - HealthcheckPort: 0, // wrong port - } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - // start service in goroutine˝ - result := make(chan error) - ready := make(chan struct{}) - go func() { - result <- service.Start(ctx, ready) - }() - - // expect timeout because of wrong port - select { - case <-ready: - s.FailNow("service should have timed out") - case <-time.After(2 * time.Second): - cancel() - err := <-result - s.ErrorIs(err, context.Canceled, "service exited for the wrong reason: %v", err) - } -} - -func (s *CommandServiceSuite) TestItFailsToStartIfExecutableNotInPath() { - service := CommandService{ - Name: "fake-service", - Path: "wrong-path", - HealthcheckPort: s.servicePort, - } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - ready := make(chan struct{}) - - err := service.Start(ctx, ready) - - s.ErrorIs(err, exec.ErrNotFound, "service exited for the wrong reason: %v", err) -} - -// Builds the fake-service binary and adds it to PATH -func (s *CommandServiceSuite) buildFakeService() { - tempDir, err := os.MkdirTemp("", "") - if err != nil { - panic(err) - } - s.tmpDir = tempDir - binaryPath := filepath.Join(s.tmpDir, "fake-service") - - cmd := exec.Command( - "go", - "build", - "-o", - binaryPath, - "fakeservice/main.go", - ) - if err := cmd.Run(); err != nil { - panic(err) - } - if _, err := os.Stat(binaryPath); os.IsNotExist(err) { - panic("fake-service binary was not created") - } - - os.Setenv("PATH", os.Getenv("PATH")+":"+s.tmpDir) -} - -func TestCommandService(t *testing.T) { - suite.Run(t, new(CommandServiceSuite)) -} diff --git a/internal/services/fakeservice/main.go b/internal/services/fakeservice/main.go deleted file mode 100644 index 17b7ebeaa..000000000 --- a/internal/services/fakeservice/main.go +++ /dev/null @@ -1,17 +0,0 @@ -// (c) Cartesi and individual authors (see AUTHORS) -// SPDX-License-Identifier: Apache-2.0 (see LICENSE) - -// This file creates a dummy webserver with the sole purpose of being used -// as a binary to test the services.Service struct -package main - -import ( - "net/http" - "os" -) - -func main() { - addr := os.Getenv("SERVICE_ADDRESS") - err := http.ListenAndServe(addr, nil) - panic(err) -}