diff --git a/.env.test b/.env.test index ab0ea15f..c51cf932 100644 --- a/.env.test +++ b/.env.test @@ -8,6 +8,7 @@ PORT=3000 AWS_ACCESS_KEY_ID="AWS_ACCESS_KEY_ID" AWS_SECRET_ACCESS_KEY="AWS_SECRET_ACCESS_KEY" AWS_REGION="us-east-1" +AWS_SNS_REGION="us-east-1" AWS_ENDPOINT_URL="http://localhost.localstack.cloud:4566" AWS_DEFAULT_REGION="localhost" @@ -27,7 +28,6 @@ SQS_WORKER_TRIGGER_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:45 ##### SNS ##### ALERTS="sns" -AWS_SNS_REGION="us-east-1" AWS_SNS_ARN="arn:aws:sns:us-east-1:000000000000:madara-orchestrator-arn" AWS_SNS_ARN_NAME="madara-orchestrator-arn" diff --git a/CHANGELOG.md b/CHANGELOG.md index de75ddb6..e9226788 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ## Changed +- AWS config built from TestConfigBuilder. +- Better TestConfigBuilder, with sync config clients. +- Drilled Config, removing dirty global reads. - settings provider - refactor AWS config usage and clean .env files - GitHub's coverage CI yml file for localstack and db testing. diff --git a/Cargo.lock b/Cargo.lock index 453ec491..14db58fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1090,12 +1090,6 @@ version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da" -[[package]] -name = "arc-swap" -version = "1.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" - [[package]] name = "ark-ec" version = "0.4.2" @@ -6423,7 +6417,6 @@ name = "orchestrator" version = "0.1.0" dependencies = [ "alloy 0.2.1", - "arc-swap", "assert_matches", "async-std", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 73347e98..78a46eae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,7 +70,6 @@ url = { version = "2.5.0", features = ["serde"] } uuid = { version = "1.7.0", features = ["v4", "serde"] } httpmock = { version = "0.7.0", features = ["remote"] } num-bigint = { version = "0.4.4" } -arc-swap = { version = "1.7.0" } num-traits = "0.2" lazy_static = "1.4.0" stark_evm_adapter = "0.1.1" diff --git a/crates/da-clients/ethereum/src/config.rs b/crates/da-clients/ethereum/src/config.rs index 0ac3b72e..3239530c 100644 --- a/crates/da-clients/ethereum/src/config.rs +++ b/crates/da-clients/ethereum/src/config.rs @@ -11,9 +11,9 @@ pub struct EthereumDaConfig { impl EthereumDaConfig { pub fn new_with_settings(settings: &impl Settings) -> color_eyre::Result { Ok(Self { - rpc_url: settings.get_settings("SETTLEMENT_RPC_URL")?, - memory_pages_contract: settings.get_settings("MEMORY_PAGES_CONTRACT_ADDRESS")?, - private_key: settings.get_settings("PRIVATE_KEY")?, + rpc_url: settings.get_settings_or_panic("SETTLEMENT_RPC_URL"), + memory_pages_contract: settings.get_settings_or_panic("MEMORY_PAGES_CONTRACT_ADDRESS"), + private_key: settings.get_settings_or_panic("PRIVATE_KEY"), }) } } diff --git a/crates/orchestrator/Cargo.toml b/crates/orchestrator/Cargo.toml index 7f9540d4..50ae5eac 100644 --- a/crates/orchestrator/Cargo.toml +++ b/crates/orchestrator/Cargo.toml @@ -13,7 +13,6 @@ path = "src/main.rs" [dependencies] alloy = { workspace = true } -arc-swap = { workspace = true } assert_matches = "1.5.0" async-std = "1.12.0" async-trait = { workspace = true } diff --git a/crates/orchestrator/src/alerts/aws_sns/config.rs b/crates/orchestrator/src/alerts/aws_sns/config.rs index c861ba88..e181a4af 100644 --- a/crates/orchestrator/src/alerts/aws_sns/config.rs +++ b/crates/orchestrator/src/alerts/aws_sns/config.rs @@ -12,8 +12,8 @@ pub struct AWSSNSConfig { impl AWSSNSConfig { pub fn new_with_settings(settings: &impl Settings) -> color_eyre::Result { Ok(Self { - sns_arn: settings.get_settings("AWS_SNS_ARN")?, - sns_arn_region: settings.get_settings("AWS_SNS_REGION")?, + sns_arn: settings.get_settings_or_panic("AWS_SNS_ARN"), + sns_arn_region: settings.get_settings_or_panic("AWS_SNS_REGION"), }) } } diff --git a/crates/orchestrator/src/alerts/aws_sns/mod.rs b/crates/orchestrator/src/alerts/aws_sns/mod.rs index 2f8740c3..ef8d5eed 100644 --- a/crates/orchestrator/src/alerts/aws_sns/mod.rs +++ b/crates/orchestrator/src/alerts/aws_sns/mod.rs @@ -1,5 +1,7 @@ mod config; +use std::sync::Arc; + use crate::alerts::aws_sns::config::AWSSNSConfig; use crate::alerts::Alerts; use crate::config::ProviderConfig; @@ -15,14 +17,11 @@ pub struct AWSSNS { } impl AWSSNS { - pub async fn new_with_settings(settings: &impl Settings, provider_config: ProviderConfig) -> Self { - match provider_config { - ProviderConfig::AWS(aws_config) => { - let sns_config = AWSSNSConfig::new_with_settings(settings) - .expect("Not able to get Aws sns config from provided settings"); - Self { client: Client::new(&aws_config), topic_arn: sns_config.sns_arn } - } - } + pub async fn new_with_settings(settings: &impl Settings, provider_config: Arc) -> Self { + let sns_config = + AWSSNSConfig::new_with_settings(settings).expect("Not able to get Aws sns config from provided settings"); + let config = provider_config.get_aws_client_or_panic(); + Self { client: Client::new(config), topic_arn: sns_config.sns_arn } } } diff --git a/crates/orchestrator/src/config.rs b/crates/orchestrator/src/config.rs index 14056aab..4d4ddd8a 100644 --- a/crates/orchestrator/src/config.rs +++ b/crates/orchestrator/src/config.rs @@ -7,26 +7,27 @@ use std::str::FromStr; use std::sync::Arc; -use crate::alerts::aws_sns::AWSSNS; -use crate::alerts::Alerts; -use crate::data_storage::aws_s3::AWSS3; -use crate::data_storage::DataStorage; -use arc_swap::{ArcSwap, Guard}; -use aws_config::meta::region::RegionProviderChain; -use aws_config::{Region, SdkConfig}; -use aws_credential_types::Credentials; -use da_client_interface::DaClient; +use aws_config::SdkConfig; use dotenvy::dotenv; -use ethereum_da_client::EthereumDaClient; +use starknet::providers::jsonrpc::HttpTransport; +use starknet::providers::{JsonRpcClient, Url}; + +use da_client_interface::DaClient; use ethereum_settlement_client::EthereumSettlementClient; use prover_client_interface::ProverClient; use settlement_client_interface::SettlementClient; use sharp_service::SharpProverService; -use starknet::providers::jsonrpc::HttpTransport; -use starknet::providers::{JsonRpcClient, Url}; use starknet_settlement_client::StarknetSettlementClient; -use tokio::sync::OnceCell; use utils::env_utils::get_env_var_or_panic; + +use crate::alerts::aws_sns::AWSSNS; +use crate::alerts::Alerts; +use crate::data_storage::aws_s3::AWSS3; +use crate::data_storage::DataStorage; +use aws_config::meta::region::RegionProviderChain; +use aws_config::Region; +use aws_credential_types::Credentials; +use ethereum_da_client::EthereumDaClient; use utils::settings::env::EnvSettingsProvider; use utils::settings::Settings; @@ -61,47 +62,50 @@ pub struct Config { /// /// We are using Arc because the config size is large and keeping it /// a pointer is a better way to pass it through. +#[derive(Clone)] pub enum ProviderConfig { - AWS(Arc), + AWS(Box), +} + +impl ProviderConfig { + pub fn get_aws_client_or_panic(&self) -> &SdkConfig { + match self { + ProviderConfig::AWS(config) => config.as_ref(), + } + } } /// To build a `SdkConfig` for AWS provider. pub async fn get_aws_config(settings_provider: &impl Settings) -> SdkConfig { - let region = settings_provider - .get_settings("AWS_REGION") - .expect("Not able to get AWS_REGION from provided settings provider."); + let region = settings_provider.get_settings_or_panic("AWS_REGION"); let region_provider = RegionProviderChain::first_try(Region::new(region)).or_default_provider(); let credentials = Credentials::from_keys( - settings_provider - .get_settings("AWS_ACCESS_KEY_ID") - .expect("Not able to get AWS_ACCESS_KEY_ID from provided settings provider."), - settings_provider - .get_settings("AWS_SECRET_ACCESS_KEY") - .expect("Not able to get AWS_SECRET_ACCESS_KEY from provided settings provider."), + settings_provider.get_settings_or_panic("AWS_ACCESS_KEY_ID"), + settings_provider.get_settings_or_panic("AWS_SECRET_ACCESS_KEY"), None, ); aws_config::from_env().credentials_provider(credentials).region(region_provider).load().await } /// Initializes the app config -pub async fn init_config() -> Config { +pub async fn init_config() -> Arc { dotenv().ok(); + let settings_provider = EnvSettingsProvider {}; + let provider_config = Arc::new(ProviderConfig::AWS(Box::new(get_aws_config(&settings_provider).await))); + // init starknet client let provider = JsonRpcClient::new(HttpTransport::new( - Url::parse(get_env_var_or_panic("MADARA_RPC_URL").as_str()).expect("Failed to parse URL"), + Url::parse(settings_provider.get_settings_or_panic("MADARA_RPC_URL").as_str()).expect("Failed to parse URL"), )); - let settings_provider = EnvSettingsProvider {}; - let aws_config = Arc::new(get_aws_config(&settings_provider).await); - // init database let database = build_database_client(&settings_provider).await; let da_client = build_da_client(&settings_provider).await; let settlement_client = build_settlement_client(&settings_provider).await; let prover_client = build_prover_service(&settings_provider); - let storage_client = build_storage_client(&settings_provider, ProviderConfig::AWS(Arc::clone(&aws_config))).await; - let alerts_client = build_alert_client(&settings_provider, ProviderConfig::AWS(Arc::clone(&aws_config))).await; + let storage_client = build_storage_client(&settings_provider, provider_config.clone()).await; + let alerts_client = build_alert_client(&settings_provider, provider_config.clone()).await; // init the queue // TODO: we use omniqueue for now which doesn't support loading AWS config @@ -109,7 +113,7 @@ pub async fn init_config() -> Config { // us stop using the generic omniqueue abstractions for message ack/nack let queue = build_queue_client(); - Config::new( + Arc::new(Config::new( Arc::new(provider), da_client, prover_client, @@ -118,7 +122,7 @@ pub async fn init_config() -> Config { queue, storage_client, alerts_client, - ) + )) } impl Config { @@ -178,33 +182,6 @@ impl Config { } } -/// The app config. It can be accessed from anywhere inside the service. -/// It's initialized only once. -/// We are using `ArcSwap` as it allow us to replace the new `Config` with -/// a new one which is required when running test cases. This approach was -/// inspired from here - https://github.com/matklad/once_cell/issues/127 -pub static CONFIG: OnceCell> = OnceCell::const_new(); - -/// Returns the app config. Initializes if not already done. -pub async fn config() -> Guard> { - let cfg = CONFIG.get_or_init(|| async { ArcSwap::from_pointee(init_config().await) }).await; - cfg.load() -} - -/// OnceCell only allows us to initialize the config once and that's how it should be on production. -/// However, when running tests, we often want to reinitialize because we want to clear the DB and -/// set it up again for reuse in new tests. By calling `config_force_init` we replace the already -/// stored config inside `ArcSwap` with the new configuration and pool settings. -#[cfg(test)] -pub async fn config_force_init(config: Config) { - match CONFIG.get() { - Some(arc) => arc.store(Arc::new(config)), - None => { - CONFIG.get_or_init(|| async { ArcSwap::from_pointee(config) }).await; - } - } -} - /// Builds the DA client based on the environment variable DA_LAYER pub async fn build_da_client(settings_provider: &impl Settings) -> Box { match get_env_var_or_panic("DA_LAYER").as_str() { @@ -246,7 +223,7 @@ pub async fn build_settlement_client(settings_provider: &impl Settings) -> Box, ) -> Box { match get_env_var_or_panic("DATA_STORAGE").as_str() { "s3" => Box::new(AWSS3::new_with_settings(settings_provider, provider_config).await), @@ -256,7 +233,7 @@ pub async fn build_storage_client( pub async fn build_alert_client( settings_provider: &impl Settings, - provider_config: ProviderConfig, + provider_config: Arc, ) -> Box { match get_env_var_or_panic("ALERTS").as_str() { "sns" => Box::new(AWSSNS::new_with_settings(settings_provider, provider_config).await), diff --git a/crates/orchestrator/src/data_storage/aws_s3/config.rs b/crates/orchestrator/src/data_storage/aws_s3/config.rs index 5c542e3f..7217099a 100644 --- a/crates/orchestrator/src/data_storage/aws_s3/config.rs +++ b/crates/orchestrator/src/data_storage/aws_s3/config.rs @@ -14,10 +14,6 @@ pub struct AWSS3Config { impl DataStorageConfig for AWSS3Config { /// To return the config struct by creating it from the environment variables. fn new_with_settings(settings: &impl Settings) -> Self { - Self { - bucket_name: settings - .get_settings("AWS_S3_BUCKET_NAME") - .expect("Not able to get AWS_S3_BUCKET_NAME from settings provided."), - } + Self { bucket_name: settings.get_settings_or_panic("AWS_S3_BUCKET_NAME") } } } diff --git a/crates/orchestrator/src/data_storage/aws_s3/mod.rs b/crates/orchestrator/src/data_storage/aws_s3/mod.rs index 8315ccd6..1547426d 100644 --- a/crates/orchestrator/src/data_storage/aws_s3/mod.rs +++ b/crates/orchestrator/src/data_storage/aws_s3/mod.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use crate::config::ProviderConfig; use crate::data_storage::aws_s3::config::AWSS3Config; use crate::data_storage::{DataStorage, DataStorageConfig}; @@ -24,18 +26,15 @@ pub struct AWSS3 { /// - initializing a new AWS S3 client impl AWSS3 { /// To init the struct with main settings - pub async fn new_with_settings(settings: &impl Settings, provider_config: ProviderConfig) -> Self { - match provider_config { - ProviderConfig::AWS(aws_config) => { - let s3_config = AWSS3Config::new_with_settings(settings); - // Building AWS S3 config - let mut s3_config_builder = aws_sdk_s3::config::Builder::from(aws_config.as_ref()); - // this is necessary for it to work with localstack in test cases - s3_config_builder.set_force_path_style(Some(true)); - let client = Client::from_conf(s3_config_builder.build()); - Self { client, bucket: s3_config.bucket_name } - } - } + pub async fn new_with_settings(settings: &impl Settings, provider_config: Arc) -> Self { + let s3_config = AWSS3Config::new_with_settings(settings); + let aws_config = provider_config.get_aws_client_or_panic(); + // Building AWS S3 config + let mut s3_config_builder = aws_sdk_s3::config::Builder::from(aws_config); + // this is necessary for it to work with localstack in test cases + s3_config_builder.set_force_path_style(Some(true)); + let client = Client::from_conf(s3_config_builder.build()); + Self { client, bucket: s3_config.bucket_name } } } diff --git a/crates/orchestrator/src/database/mongodb/config.rs b/crates/orchestrator/src/database/mongodb/config.rs index 67c25b39..ab30812d 100644 --- a/crates/orchestrator/src/database/mongodb/config.rs +++ b/crates/orchestrator/src/database/mongodb/config.rs @@ -10,10 +10,6 @@ pub struct MongoDbConfig { impl DatabaseConfig for MongoDbConfig { fn new_with_settings(settings: &impl Settings) -> Self { - Self { - url: settings - .get_settings("MONGODB_CONNECTION_STRING") - .expect("Not able to get MONGODB_CONNECTION_STRING form the given settings"), - } + Self { url: settings.get_settings_or_panic("MONGODB_CONNECTION_STRING") } } } diff --git a/crates/orchestrator/src/jobs/da_job/mod.rs b/crates/orchestrator/src/jobs/da_job/mod.rs index b50be76e..800cc195 100644 --- a/crates/orchestrator/src/jobs/da_job/mod.rs +++ b/crates/orchestrator/src/jobs/da_job/mod.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use std::ops::{Add, Mul, Rem}; use std::str::FromStr; +use std::sync::Arc; use async_trait::async_trait; use chrono::{SubsecRound, Utc}; @@ -15,12 +16,13 @@ use thiserror::Error; use tracing::log; use uuid::Uuid; -use super::types::{JobItem, JobStatus, JobType, JobVerificationStatus}; -use super::{Job, JobError, OtherError}; use crate::config::Config; use crate::constants::BLOB_DATA_FILE_NAME; use crate::jobs::state_update_job::utils::biguint_vec_to_u8_vec; +use super::types::{JobItem, JobStatus, JobType, JobVerificationStatus}; +use super::{Job, JobError, OtherError}; + lazy_static! { /// EIP-4844 BLS12-381 modulus. /// @@ -62,7 +64,7 @@ pub struct DaJob; impl Job for DaJob { async fn create_job( &self, - _config: &Config, + _config: Arc, internal_id: String, metadata: HashMap, ) -> Result { @@ -79,7 +81,7 @@ impl Job for DaJob { }) } - async fn process_job(&self, config: &Config, job: &mut JobItem) -> Result { + async fn process_job(&self, config: Arc, job: &mut JobItem) -> Result { let block_no = job .internal_id .parse::() @@ -100,7 +102,7 @@ impl Job for DaJob { MaybePendingStateUpdate::Update(state_update) => state_update, }; // constructing the data from the rpc - let blob_data = state_update_to_blob_data(block_no, state_update, config) + let blob_data = state_update_to_blob_data(block_no, state_update, config.clone()) .await .map_err(|e| JobError::Other(OtherError(e)))?; // transforming the data so that we can apply FFT on this. @@ -110,7 +112,7 @@ impl Job for DaJob { // data transformation on the data let transformed_data = fft_transformation(blob_data_biguint); - store_blob_data(transformed_data.clone(), block_no, config).await?; + store_blob_data(transformed_data.clone(), block_no, config.clone()).await?; let max_bytes_per_blob = config.da_client().max_bytes_per_blob().await; let max_blob_per_txn = config.da_client().max_blob_per_txn().await; @@ -143,7 +145,7 @@ impl Job for DaJob { Ok(external_id) } - async fn verify_job(&self, config: &Config, job: &mut JobItem) -> Result { + async fn verify_job(&self, config: Arc, job: &mut JobItem) -> Result { Ok(config .da_client() .verify_inclusion(job.external_id.unwrap_string().map_err(|e| JobError::Other(OtherError(e)))?) @@ -236,7 +238,7 @@ fn data_to_blobs(blob_size: u64, block_data: Vec) -> Result pub async fn state_update_to_blob_data( block_no: u64, state_update: StateUpdate, - config: &Config, + config: Arc, ) -> color_eyre::Result> { let mut state_diff = state_update.state_diff; @@ -299,7 +301,7 @@ pub async fn state_update_to_blob_data( } /// To store the blob data using the storage client with path /blob_data.txt -async fn store_blob_data(blob_data: Vec, block_number: u64, config: &Config) -> Result<(), JobError> { +async fn store_blob_data(blob_data: Vec, block_number: u64, config: Arc) -> Result<(), JobError> { let storage_client = config.storage(); let key = block_number.to_string() + "/" + BLOB_DATA_FILE_NAME; @@ -353,17 +355,12 @@ fn da_word(class_flag: bool, nonce_change: Option, num_changes: u6 #[cfg(test)] pub mod test { - use crate::jobs::da_job::da_word; use std::fs; use std::fs::File; use std::io::Read; - use std::sync::Arc; - use crate::config::config; - use crate::tests::config::TestConfigBuilder; use ::serde::{Deserialize, Serialize}; use color_eyre::Result; - use da_client_interface::MockDaClient; use httpmock::prelude::*; use majin_blob_core::blob; use majin_blob_types::serde; @@ -374,6 +371,10 @@ pub mod test { use starknet_core::types::{FieldElement, StateUpdate}; use url::Url; + use da_client_interface::MockDaClient; + + use crate::jobs::da_job::da_word; + /// Tests `da_word` function with various inputs for class flag, new nonce, and number of changes. /// Verifies that `da_word` produces the correct FieldElement based on the provided parameters. /// Uses test cases with different combinations of inputs and expected output strings. @@ -431,7 +432,10 @@ pub mod test { #[case] file_path: &str, #[case] nonce_file_path: &str, ) { - use crate::jobs::da_job::{convert_to_biguint, state_update_to_blob_data}; + use crate::{ + jobs::da_job::{convert_to_biguint, state_update_to_blob_data}, + tests::config::TestConfigBuilder, + }; let server = MockServer::start(); let mut da_client = MockDaClient::new(); @@ -441,24 +445,21 @@ pub mod test { da_client.expect_max_bytes_per_blob().with().returning(|| 131072); // Mocking storage client - let provider = JsonRpcClient::new(HttpTransport::new( Url::parse(format!("http://localhost:{}", server.port()).as_str()).expect("Failed to parse URL"), )); // mock block number (madara) : 5 - TestConfigBuilder::new() - .mock_starknet_client(Arc::new(provider)) - .mock_da_client(Box::new(da_client)) + let services = TestConfigBuilder::new() + .configure_starknet_client(provider.into()) + .configure_da_client(da_client.into()) .build() .await; - let config = config().await; - get_nonce_attached(&server, nonce_file_path); let state_update = read_state_update_from_file(state_update_file_path).expect("issue while reading"); - let blob_data = state_update_to_blob_data(block_no, state_update, &config) + let blob_data = state_update_to_blob_data(block_no, state_update, services.config) .await .expect("issue while converting state update to blob data"); let blob_data_biguint = convert_to_biguint(blob_data); diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index 11da33d8..1508913d 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -1,8 +1,9 @@ use std::collections::HashMap; use std::fmt; +use std::sync::Arc; use std::time::Duration; -use crate::config::{config, Config}; +use crate::config::Config; use crate::jobs::constants::{JOB_PROCESS_ATTEMPT_METADATA_KEY, JOB_VERIFICATION_ATTEMPT_METADATA_KEY}; use crate::jobs::types::{JobItem, JobStatus, JobType, JobVerificationStatus}; use crate::queue::job_queue::{add_job_to_process_queue, add_job_to_verification_queue, ConsumptionError}; @@ -100,18 +101,18 @@ pub trait Job: Send + Sync { /// Should build a new job item and return it async fn create_job( &self, - config: &Config, + config: Arc, internal_id: String, metadata: HashMap, ) -> Result; /// Should process the job and return the external_id which can be used to /// track the status of the job. For example, a DA job will submit the state diff /// to the DA layer and return the txn hash. - async fn process_job(&self, config: &Config, job: &mut JobItem) -> Result; + async fn process_job(&self, config: Arc, job: &mut JobItem) -> Result; /// Should verify the job and return the status of the verification. For example, /// a DA job will verify the inclusion of the state diff in the DA layer and return /// the status of the verification. - async fn verify_job(&self, config: &Config, job: &mut JobItem) -> Result; + async fn verify_job(&self, config: Arc, job: &mut JobItem) -> Result; /// Should return the maximum number of attempts to process the job. A new attempt is made /// every time the verification returns `JobVerificationStatus::Rejected` fn max_process_attempts(&self) -> u64; @@ -127,8 +128,8 @@ pub async fn create_job( job_type: JobType, internal_id: String, metadata: HashMap, + config: Arc, ) -> Result<(), JobError> { - let config = config().await; let existing_job = config .database() .get_job_by_internal_id_and_type(internal_id.as_str(), &job_type) @@ -139,19 +140,17 @@ pub async fn create_job( } let job_handler = factory::get_job_handler(&job_type).await; - let job_item = job_handler.create_job(config.as_ref(), internal_id.clone(), metadata.clone()).await?; - + let job_item = job_handler.create_job(config.clone(), internal_id, metadata).await?; config.database().create_job(job_item.clone()).await.map_err(|e| JobError::Other(OtherError(e)))?; - add_job_to_process_queue(job_item.id).await.map_err(|e| JobError::Other(OtherError(e)))?; + add_job_to_process_queue(job_item.id, config.clone()).await.map_err(|e| JobError::Other(OtherError(e)))?; Ok(()) } /// Processes the job, increments the process attempt count and updates the status of the job in the /// DB. It then adds the job to the verification queue. -pub async fn process_job(id: Uuid) -> Result<(), JobError> { - let config = config().await; - let mut job = get_job(id).await?; +pub async fn process_job(id: Uuid, config: Arc) -> Result<(), JobError> { + let mut job = get_job(id, config.clone()).await?; match job.status { // we only want to process jobs that are in the created or verification failed state. @@ -173,12 +172,11 @@ pub async fn process_job(id: Uuid) -> Result<(), JobError> { .map_err(|e| JobError::Other(OtherError(e)))?; let job_handler = factory::get_job_handler(&job.job_type).await; - let external_id = job_handler.process_job(config.as_ref(), &mut job).await?; - + let external_id = job_handler.process_job(config.clone(), &mut job).await?; let metadata = increment_key_in_metadata(&job.metadata, JOB_PROCESS_ATTEMPT_METADATA_KEY)?; // Fetching the job again because update status above will update the job version - let mut job_updated = get_job(id).await?; + let mut job_updated = get_job(id, config.clone()).await?; job_updated.external_id = external_id.into(); job_updated.status = JobStatus::PendingVerification; @@ -186,9 +184,13 @@ pub async fn process_job(id: Uuid) -> Result<(), JobError> { config.database().update_job(&job_updated).await.map_err(|e| JobError::Other(OtherError(e)))?; - add_job_to_verification_queue(job.id, Duration::from_secs(job_handler.verification_polling_delay_seconds())) - .await - .map_err(|e| JobError::Other(OtherError(e)))?; + add_job_to_verification_queue( + job.id, + Duration::from_secs(job_handler.verification_polling_delay_seconds()), + config.clone(), + ) + .await + .map_err(|e| JobError::Other(OtherError(e)))?; Ok(()) } @@ -197,9 +199,8 @@ pub async fn process_job(id: Uuid) -> Result<(), JobError> { /// retries processing the job if the max attempts have not been exceeded. If the max attempts have /// been exceeded, it marks the job as timed out. If the verification is still pending, it pushes the /// job back to the queue. -pub async fn verify_job(id: Uuid) -> Result<(), JobError> { - let config = config().await; - let mut job = get_job(id).await?; +pub async fn verify_job(id: Uuid, config: Arc) -> Result<(), JobError> { + let mut job = get_job(id, config.clone()).await?; match job.status { JobStatus::PendingVerification => { @@ -211,7 +212,7 @@ pub async fn verify_job(id: Uuid) -> Result<(), JobError> { } let job_handler = factory::get_job_handler(&job.job_type).await; - let verification_status = job_handler.verify_job(config.as_ref(), &mut job).await?; + let verification_status = job_handler.verify_job(config.clone(), &mut job).await?; match verification_status { JobVerificationStatus::Verified => { @@ -239,7 +240,7 @@ pub async fn verify_job(id: Uuid) -> Result<(), JobError> { job.id, process_attempts + 1 ); - add_job_to_process_queue(job.id).await.map_err(|e| JobError::Other(OtherError(e)))?; + add_job_to_process_queue(job.id, config.clone()).await.map_err(|e| JobError::Other(OtherError(e)))?; return Ok(()); } } @@ -261,6 +262,7 @@ pub async fn verify_job(id: Uuid) -> Result<(), JobError> { add_job_to_verification_queue( job.id, Duration::from_secs(job_handler.verification_polling_delay_seconds()), + config.clone(), ) .await .map_err(|e| JobError::Other(OtherError(e)))?; @@ -272,10 +274,8 @@ pub async fn verify_job(id: Uuid) -> Result<(), JobError> { /// Terminates the job and updates the status of the job in the DB. /// Logs error if the job status `Completed` is existing on DL queue. -pub async fn handle_job_failure(id: Uuid) -> Result<(), JobError> { - let config = config().await; - - let mut job = get_job(id).await?.clone(); +pub async fn handle_job_failure(id: Uuid, config: Arc) -> Result<(), JobError> { + let mut job = get_job(id, config.clone()).await?.clone(); let mut metadata = job.metadata.clone(); if job.status == JobStatus::Completed { @@ -297,8 +297,7 @@ pub async fn handle_job_failure(id: Uuid) -> Result<(), JobError> { Ok(()) } -async fn get_job(id: Uuid) -> Result { - let config = config().await; +async fn get_job(id: Uuid, config: Arc) -> Result { let job = config.database().get_job_by_id(id).await.map_err(|e| JobError::Other(OtherError(e)))?; match job { Some(job) => Ok(job), diff --git a/crates/orchestrator/src/jobs/proving_job/mod.rs b/crates/orchestrator/src/jobs/proving_job/mod.rs index 3c81b2c9..c0d639ce 100644 --- a/crates/orchestrator/src/jobs/proving_job/mod.rs +++ b/crates/orchestrator/src/jobs/proving_job/mod.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::sync::Arc; use async_trait::async_trait; use cairo_vm::vm::runners::cairo_pie::CairoPie; @@ -35,7 +36,7 @@ pub struct ProvingJob; impl Job for ProvingJob { async fn create_job( &self, - _config: &Config, + _config: Arc, internal_id: String, metadata: HashMap, ) -> Result { @@ -52,7 +53,7 @@ impl Job for ProvingJob { }) } - async fn process_job(&self, config: &Config, job: &mut JobItem) -> Result { + async fn process_job(&self, config: Arc, job: &mut JobItem) -> Result { // Cairo Pie path in s3 storage client let cairo_pie_path = job.internal_id.to_string() + "/pie.zip"; let cairo_pie_file = config @@ -73,7 +74,7 @@ impl Job for ProvingJob { Ok(external_id) } - async fn verify_job(&self, config: &Config, job: &mut JobItem) -> Result { + async fn verify_job(&self, config: Arc, job: &mut JobItem) -> Result { let task_id: String = job.external_id.unwrap_string().map_err(|e| JobError::Other(OtherError(e)))?.into(); let task_status = config .prover_client() diff --git a/crates/orchestrator/src/jobs/register_proof_job/mod.rs b/crates/orchestrator/src/jobs/register_proof_job/mod.rs index f7d958ad..3ae7d04c 100644 --- a/crates/orchestrator/src/jobs/register_proof_job/mod.rs +++ b/crates/orchestrator/src/jobs/register_proof_job/mod.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::sync::Arc; use async_trait::async_trait; use chrono::{SubsecRound, Utc}; @@ -17,7 +18,7 @@ pub struct RegisterProofJob; impl Job for RegisterProofJob { async fn create_job( &self, - _config: &Config, + _config: Arc, internal_id: String, metadata: HashMap, ) -> Result { @@ -36,14 +37,14 @@ impl Job for RegisterProofJob { }) } - async fn process_job(&self, _config: &Config, _job: &mut JobItem) -> Result { + async fn process_job(&self, _config: Arc, _job: &mut JobItem) -> Result { // Get proof from storage and submit on chain for verification // We need to implement a generic trait for this to support multiple // base layers todo!() } - async fn verify_job(&self, _config: &Config, _job: &mut JobItem) -> Result { + async fn verify_job(&self, _config: Arc, _job: &mut JobItem) -> Result { // verify that the proof transaction has been included on chain todo!() } diff --git a/crates/orchestrator/src/jobs/snos_job/mod.rs b/crates/orchestrator/src/jobs/snos_job/mod.rs index 02898767..5fd01463 100644 --- a/crates/orchestrator/src/jobs/snos_job/mod.rs +++ b/crates/orchestrator/src/jobs/snos_job/mod.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::sync::Arc; use async_trait::async_trait; use chrono::{SubsecRound, Utc}; @@ -17,7 +18,7 @@ pub struct SnosJob; impl Job for SnosJob { async fn create_job( &self, - _config: &Config, + _config: Arc, internal_id: String, metadata: HashMap, ) -> Result { @@ -34,14 +35,14 @@ impl Job for SnosJob { }) } - async fn process_job(&self, _config: &Config, _job: &mut JobItem) -> Result { + async fn process_job(&self, _config: Arc, _job: &mut JobItem) -> Result { // 1. Fetch SNOS input data from Madara // 2. Import SNOS in Rust and execute it with the input data // 3. Store the received PIE in DB todo!() } - async fn verify_job(&self, _config: &Config, _job: &mut JobItem) -> Result { + async fn verify_job(&self, _config: Arc, _job: &mut JobItem) -> Result { // No need for verification as of now. If we later on decide to outsource SNOS run // to another service, verify_job can be used to poll on the status of the job todo!() diff --git a/crates/orchestrator/src/jobs/state_update_job/mod.rs b/crates/orchestrator/src/jobs/state_update_job/mod.rs index fa5d897f..afa24e28 100644 --- a/crates/orchestrator/src/jobs/state_update_job/mod.rs +++ b/crates/orchestrator/src/jobs/state_update_job/mod.rs @@ -1,6 +1,7 @@ pub mod utils; use std::collections::HashMap; +use std::sync::Arc; use ::utils::collections::{has_dup, is_sorted}; use async_trait::async_trait; @@ -19,7 +20,7 @@ use super::constants::{ }; use super::{JobError, OtherError}; -use crate::config::{config, Config}; +use crate::config::Config; use crate::constants::SNOS_OUTPUT_FILE_NAME; use crate::jobs::constants::JOB_METADATA_STATE_UPDATE_BLOCKS_TO_SETTLE_KEY; use crate::jobs::state_update_job::utils::{fetch_blob_data_for_block, fetch_program_data_for_block}; @@ -73,7 +74,7 @@ pub struct StateUpdateJob; impl Job for StateUpdateJob { async fn create_job( &self, - _config: &Config, + _config: Arc, internal_id: String, metadata: HashMap, ) -> Result { @@ -98,7 +99,7 @@ impl Job for StateUpdateJob { }) } - async fn process_job(&self, config: &Config, job: &mut JobItem) -> Result { + async fn process_job(&self, config: Arc, job: &mut JobItem) -> Result { let attempt_no = job .metadata .get(JOB_PROCESS_ATTEMPT_METADATA_KEY) @@ -108,7 +109,7 @@ impl Job for StateUpdateJob { // Read the metadata to get the blocks for which state update will be performed. // We assume that blocks nbrs are formatted as follow: "2,3,4,5,6". let mut block_numbers = self.get_block_numbers_from_metadata(job)?; - self.validate_block_numbers(config, &block_numbers).await?; + self.validate_block_numbers(config.clone(), &block_numbers).await?; // If we had a block state update failing last run, we recover from this block if let Some(last_failed_block) = job.metadata.get(JOB_METADATA_STATE_UPDATE_LAST_FAILED_BLOCK_NO) { @@ -122,8 +123,8 @@ impl Job for StateUpdateJob { let mut sent_tx_hashes: Vec = Vec::with_capacity(block_numbers.len()); for block_no in block_numbers.iter() { - let snos = self.fetch_snos_for_block(*block_no).await; - let tx_hash = self.update_state_for_block(config, *block_no, snos, nonce).await.map_err(|e| { + let snos = self.fetch_snos_for_block(*block_no, config.clone()).await; + let tx_hash = self.update_state_for_block(config.clone(), *block_no, snos, nonce).await.map_err(|e| { job.metadata.insert(JOB_METADATA_STATE_UPDATE_LAST_FAILED_BLOCK_NO.into(), block_no.to_string()); self.insert_attempts_into_metadata(job, &attempt_no, &sent_tx_hashes); @@ -148,7 +149,7 @@ impl Job for StateUpdateJob { /// Status will be verified if: /// 1. the last settlement tx hash is successful, /// 2. the expected last settled block from our configuration is indeed the one found in the provider. - async fn verify_job(&self, config: &Config, job: &mut JobItem) -> Result { + async fn verify_job(&self, config: Arc, job: &mut JobItem) -> Result { let attempt_no = job .metadata .get(JOB_PROCESS_ATTEMPT_METADATA_KEY) @@ -254,7 +255,7 @@ impl StateUpdateJob { } /// Validate that the list of block numbers to process is valid. - async fn validate_block_numbers(&self, config: &Config, block_numbers: &[u64]) -> Result<(), JobError> { + async fn validate_block_numbers(&self, config: Arc, block_numbers: &[u64]) -> Result<(), JobError> { if block_numbers.is_empty() { Err(StateUpdateError::BlockNumberNotFound)?; } @@ -276,7 +277,7 @@ impl StateUpdateJob { /// Update the state for the corresponding block using the settlement layer. async fn update_state_for_block( &self, - config: &Config, + config: Arc, block_no: u64, snos: StarknetOsOutput, nonce: u64, @@ -285,9 +286,13 @@ impl StateUpdateJob { let last_tx_hash_executed = if snos.use_kzg_da == Felt252::ZERO { unimplemented!("update_state_for_block not implemented as of now for calldata DA.") } else if snos.use_kzg_da == Felt252::ONE { - let blob_data = fetch_blob_data_for_block(block_no).await.map_err(|e| JobError::Other(OtherError(e)))?; - let program_output = - fetch_program_data_for_block(block_no).await.map_err(|e| JobError::Other(OtherError(e)))?; + let blob_data = fetch_blob_data_for_block(block_no, config.clone()) + .await + .map_err(|e| JobError::Other(OtherError(e)))?; + + let program_output = fetch_program_data_for_block(block_no, config.clone()) + .await + .map_err(|e| JobError::Other(OtherError(e)))?; // TODO : // Fetching nonce before the transaction is run // Sending update_state transaction from the settlement client @@ -302,8 +307,7 @@ impl StateUpdateJob { } /// Retrieves the SNOS output for the corresponding block. - async fn fetch_snos_for_block(&self, block_no: u64) -> StarknetOsOutput { - let config = config().await; + async fn fetch_snos_for_block(&self, block_no: u64, config: Arc) -> StarknetOsOutput { let storage_client = config.storage(); let key = block_no.to_string() + "/" + SNOS_OUTPUT_FILE_NAME; let snos_output_bytes = storage_client.get_data(&key).await.expect("Unable to fetch snos output for block"); diff --git a/crates/orchestrator/src/jobs/state_update_job/utils.rs b/crates/orchestrator/src/jobs/state_update_job/utils.rs index 4a749a55..3eea495e 100644 --- a/crates/orchestrator/src/jobs/state_update_job/utils.rs +++ b/crates/orchestrator/src/jobs/state_update_job/utils.rs @@ -1,16 +1,17 @@ +use std::sync::Arc; + +use crate::config::Config; use std::fmt::Write; use std::io::{BufRead, Cursor}; use std::str::FromStr; -use crate::config::config; use crate::constants::{BLOB_DATA_FILE_NAME, PROGRAM_OUTPUT_FILE_NAME}; use alloy::primitives::U256; use color_eyre::eyre::eyre; use num_bigint::BigUint; /// Fetching the blob data (stored in remote storage during DA job) for a particular block -pub async fn fetch_blob_data_for_block(block_number: u64) -> color_eyre::Result>> { - let config = config().await; +pub async fn fetch_blob_data_for_block(block_number: u64, config: Arc) -> color_eyre::Result>> { let storage_client = config.storage(); let key = block_number.to_string() + "/" + BLOB_DATA_FILE_NAME; let blob_data = storage_client.get_data(&key).await?; @@ -18,8 +19,7 @@ pub async fn fetch_blob_data_for_block(block_number: u64) -> color_eyre::Result< } /// Fetching the blob data (stored in remote storage during DA job) for a particular block -pub async fn fetch_program_data_for_block(block_number: u64) -> color_eyre::Result> { - let config = config().await; +pub async fn fetch_program_data_for_block(block_number: u64, config: Arc) -> color_eyre::Result> { let storage_client = config.storage(); let key = block_number.to_string() + "/" + PROGRAM_OUTPUT_FILE_NAME; let blob_data = storage_client.get_data(&key).await?; diff --git a/crates/orchestrator/src/main.rs b/crates/orchestrator/src/main.rs index 1739f453..dfeecd6d 100644 --- a/crates/orchestrator/src/main.rs +++ b/crates/orchestrator/src/main.rs @@ -1,5 +1,5 @@ use dotenvy::dotenv; -use orchestrator::config::config; +use orchestrator::config::init_config; use orchestrator::queue::init_consumers; use orchestrator::routes::app_router; use utils::env_utils::get_env_var_or_default; @@ -11,7 +11,8 @@ async fn main() { tracing_subscriber::fmt().with_max_level(tracing::Level::INFO).with_target(false).init(); // initial config setup - config().await; + let config = init_config().await; + let host = get_env_var_or_default("HOST", "127.0.0.1"); let port = get_env_var_or_default("PORT", "3000").parse::().expect("PORT must be a u16"); let address = format!("{}:{}", host, port); @@ -19,7 +20,7 @@ async fn main() { let app = app_router(); // init consumer - init_consumers().await.expect("Failed to init consumers"); + init_consumers(config).await.expect("Failed to init consumers"); tracing::info!("Listening on http://{}", address); axum::serve(listener, app).await.expect("Failed to start axum server"); diff --git a/crates/orchestrator/src/queue/job_queue.rs b/crates/orchestrator/src/queue/job_queue.rs index ff76d04f..70172b3f 100644 --- a/crates/orchestrator/src/queue/job_queue.rs +++ b/crates/orchestrator/src/queue/job_queue.rs @@ -1,17 +1,25 @@ use std::future::Future; use std::str::FromStr; +use std::sync::Arc; use std::time::Duration; use color_eyre::eyre::Context; use color_eyre::Result as EyreResult; use omniqueue::{Delivery, QueueError}; use serde::{Deserialize, Deserializer, Serialize}; +use thiserror::Error; use tokio::time::sleep; use tracing::log; use uuid::Uuid; -use crate::config::config; +use crate::config::Config; use crate::jobs::{handle_job_failure, process_job, verify_job, JobError, OtherError}; +use crate::workers::data_submission_worker::DataSubmissionWorker; +use crate::workers::proof_registration::ProofRegistrationWorker; +use crate::workers::proving::ProvingWorker; +use crate::workers::snos::SnosWorker; +use crate::workers::update_state::UpdateStateWorker; +use crate::workers::Worker; pub const JOB_PROCESSING_QUEUE: &str = "madara_orchestrator_job_processing_queue"; pub const JOB_VERIFICATION_QUEUE: &str = "madara_orchestrator_job_verification_queue"; @@ -21,14 +29,6 @@ pub const JOB_HANDLE_FAILURE_QUEUE: &str = "madara_orchestrator_job_handle_failu // Queues for SNOS worker trigger listening pub const WORKER_TRIGGER_QUEUE: &str = "madara_orchestrator_worker_trigger_queue"; -use crate::workers::data_submission_worker::DataSubmissionWorker; -use crate::workers::proof_registration::ProofRegistrationWorker; -use crate::workers::proving::ProvingWorker; -use crate::workers::snos::SnosWorker; -use crate::workers::update_state::UpdateStateWorker; -use crate::workers::Worker; -use thiserror::Error; - #[derive(Error, Debug, PartialEq)] pub enum ConsumptionError { #[error("Failed to consume message from queue, error {error_msg:?}")] @@ -103,23 +103,28 @@ enum DeliveryReturnType { NoMessage, } -pub async fn add_job_to_process_queue(id: Uuid) -> EyreResult<()> { +pub async fn add_job_to_process_queue(id: Uuid, config: Arc) -> EyreResult<()> { log::info!("Adding job with id {:?} to processing queue", id); - add_job_to_queue(id, JOB_PROCESSING_QUEUE.to_string(), None).await + add_job_to_queue(id, JOB_PROCESSING_QUEUE.to_string(), None, config).await } -pub async fn add_job_to_verification_queue(id: Uuid, delay: Duration) -> EyreResult<()> { +pub async fn add_job_to_verification_queue(id: Uuid, delay: Duration, config: Arc) -> EyreResult<()> { log::info!("Adding job with id {:?} to verification queue", id); - add_job_to_queue(id, JOB_VERIFICATION_QUEUE.to_string(), Some(delay)).await + add_job_to_queue(id, JOB_VERIFICATION_QUEUE.to_string(), Some(delay), config).await } -pub async fn consume_job_from_queue(queue: String, handler: F) -> Result<(), ConsumptionError> +pub async fn consume_job_from_queue( + queue: String, + handler: F, + config: Arc, +) -> Result<(), ConsumptionError> where - F: FnOnce(Uuid) -> Fut, + F: FnOnce(Uuid, Arc) -> Fut, Fut: Future>, { log::debug!("Consuming from queue {:?}", queue); - let delivery = get_delivery_from_queue(&queue).await?; + + let delivery = get_delivery_from_queue(&queue, config.clone()).await?; let message = match delivery { DeliveryReturnType::Message(message) => message, @@ -129,7 +134,7 @@ where let job_message = parse_job_message(&message)?; if let Some(job_message) = job_message { - handle_job_message(job_message, message, handler).await?; + handle_job_message(job_message, message, handler, config).await?; } Ok(()) @@ -140,13 +145,14 @@ where pub async fn consume_worker_trigger_messages_from_queue( queue: String, handler: F, + config: Arc, ) -> Result<(), ConsumptionError> where - F: FnOnce(Box) -> Fut, + F: FnOnce(Box, Arc) -> Fut, Fut: Future>, { log::debug!("Consuming from queue {:?}", queue); - let delivery = get_delivery_from_queue(&queue).await?; + let delivery = get_delivery_from_queue(&queue, config.clone()).await?; let message = match delivery { DeliveryReturnType::Message(message) => message, @@ -156,7 +162,7 @@ where let job_message = parse_worker_message(&message)?; if let Some(job_message) = job_message { - handle_worker_message(job_message, message, handler).await?; + handle_worker_message(job_message, message, handler, config).await?; } Ok(()) @@ -180,14 +186,15 @@ async fn handle_job_message( job_message: JobQueueMessage, message: Delivery, handler: F, + config: Arc, ) -> Result<(), ConsumptionError> where - F: FnOnce(Uuid) -> Fut, + F: FnOnce(Uuid, Arc) -> Fut, Fut: Future>, { log::info!("Handling job with id {:?}", job_message.id); - match handler(job_message.id).await { + match handler(job_message.id, config.clone()).await { Ok(_) => { message .ack() @@ -199,8 +206,7 @@ where } Err(e) => { log::error!("Failed to handle job with id {:?}. Error: {:?}", job_message.id, e); - config() - .await + config .alerts() .send_alert_message(e.to_string()) .await @@ -224,15 +230,16 @@ async fn handle_worker_message( job_message: WorkerTriggerMessage, message: Delivery, handler: F, + config: Arc, ) -> Result<(), ConsumptionError> where - F: FnOnce(Box) -> Fut, + F: FnOnce(Box, Arc) -> Fut, Fut: Future>, { log::info!("Handling worker trigger for worker type : {:?}", job_message.worker); let worker_handler = get_worker_handler_from_worker_trigger_type(job_message.worker.clone()); - match handler(worker_handler).await { + match handler(worker_handler, config.clone()).await { Ok(_) => { message .ack() @@ -244,8 +251,7 @@ where } Err(e) => { log::error!("Failed to handle worker trigger {:?}. Error: {:?}", job_message.worker, e); - config() - .await + config .alerts() .send_alert_message(e.to_string()) .await @@ -272,8 +278,8 @@ fn get_worker_handler_from_worker_trigger_type(worker_trigger_type: WorkerTrigge } /// To get the delivery from the message queue using the queue name -async fn get_delivery_from_queue(queue: &str) -> Result { - match config().await.queue().consume_message_from_queue(queue.to_string()).await { +async fn get_delivery_from_queue(queue: &str, config: Arc) -> Result { + match config.queue().consume_message_from_queue(queue.to_string()).await { Ok(d) => Ok(DeliveryReturnType::Message(d)), Err(QueueError::NoData) => Ok(DeliveryReturnType::NoMessage), Err(e) => Err(ConsumptionError::FailedToConsumeFromQueue { error_msg: e.to_string() }), @@ -281,10 +287,11 @@ async fn get_delivery_from_queue(queue: &str) -> Result { + ($queue_type :expr, $handler : expr, $consume_function: expr, $config :expr) => { + let config_clone = $config.clone(); tokio::spawn(async move { loop { - match $consume_function($queue_type, $handler).await { + match $consume_function($queue_type, $handler, config_clone.clone()).await { Ok(_) => {} Err(e) => log::error!("Failed to consume from queue {:?}. Error: {:?}", $queue_type, e), } @@ -294,22 +301,21 @@ macro_rules! spawn_consumer { }; } -pub async fn init_consumers() -> Result<(), JobError> { - spawn_consumer!(JOB_PROCESSING_QUEUE.to_string(), process_job, consume_job_from_queue); - spawn_consumer!(JOB_VERIFICATION_QUEUE.to_string(), verify_job, consume_job_from_queue); - spawn_consumer!(JOB_HANDLE_FAILURE_QUEUE.to_string(), handle_job_failure, consume_job_from_queue); - spawn_consumer!(WORKER_TRIGGER_QUEUE.to_string(), spawn_worker, consume_worker_trigger_messages_from_queue); +pub async fn init_consumers(config: Arc) -> Result<(), JobError> { + spawn_consumer!(JOB_PROCESSING_QUEUE.to_string(), process_job, consume_job_from_queue, config.clone()); + spawn_consumer!(JOB_VERIFICATION_QUEUE.to_string(), verify_job, consume_job_from_queue, config.clone()); + spawn_consumer!(JOB_HANDLE_FAILURE_QUEUE.to_string(), handle_job_failure, consume_job_from_queue, config.clone()); + spawn_consumer!(WORKER_TRIGGER_QUEUE.to_string(), spawn_worker, consume_worker_trigger_messages_from_queue, config); Ok(()) } /// To spawn the worker by passing the worker struct -async fn spawn_worker(worker: Box) -> color_eyre::Result<()> { - worker.run_worker_if_enabled().await.expect("Error in running the worker."); +async fn spawn_worker(worker: Box, config: Arc) -> color_eyre::Result<()> { + worker.run_worker_if_enabled(config).await.expect("Error in running the worker."); Ok(()) } -async fn add_job_to_queue(id: Uuid, queue: String, delay: Option) -> EyreResult<()> { - let config = config().await; +async fn add_job_to_queue(id: Uuid, queue: String, delay: Option, config: Arc) -> EyreResult<()> { let message = JobQueueMessage { id }; config.queue().send_message_to_queue(queue, serde_json::to_string(&message)?, delay).await?; Ok(()) diff --git a/crates/orchestrator/src/queue/mod.rs b/crates/orchestrator/src/queue/mod.rs index 48599364..fd2215cb 100644 --- a/crates/orchestrator/src/queue/mod.rs +++ b/crates/orchestrator/src/queue/mod.rs @@ -1,14 +1,14 @@ pub mod job_queue; pub mod sqs; -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use async_trait::async_trait; use color_eyre::Result as EyreResult; use mockall::automock; use omniqueue::{Delivery, QueueError}; -use crate::jobs::JobError; +use crate::{config::Config, jobs::JobError}; /// The QueueProvider trait is used to define the methods that a queue /// should implement to be used as a queue for the orchestrator. The @@ -20,6 +20,6 @@ pub trait QueueProvider: Send + Sync { async fn consume_message_from_queue(&self, queue: String) -> std::result::Result; } -pub async fn init_consumers() -> Result<(), JobError> { - job_queue::init_consumers().await +pub async fn init_consumers(config: Arc) -> Result<(), JobError> { + job_queue::init_consumers(config).await } diff --git a/crates/orchestrator/src/tests/alerts/mod.rs b/crates/orchestrator/src/tests/alerts/mod.rs index 2026061b..78b92669 100644 --- a/crates/orchestrator/src/tests/alerts/mod.rs +++ b/crates/orchestrator/src/tests/alerts/mod.rs @@ -1,25 +1,26 @@ -use crate::config::config; -use crate::tests::common::{get_sns_client, get_sqs_client}; -use crate::tests::config::TestConfigBuilder; +use std::time::Duration; + use aws_sdk_sqs::types::QueueAttributeName::QueueArn; use rstest::rstest; -use std::time::Duration; use tokio::time::sleep; + use utils::env_utils::get_env_var_or_panic; +use crate::tests::common::{get_sns_client, get_sqs_client}; +use crate::tests::config::{ConfigType, TestConfigBuilder}; + pub const SNS_ALERT_TEST_QUEUE: &str = "orchestrator_sns_alert_testing_queue"; #[rstest] #[tokio::test] async fn sns_alert_subscribe_to_topic_receive_alert_works() { - TestConfigBuilder::new().build().await; + let services = TestConfigBuilder::new().configure_alerts(ConfigType::Actual).build().await; - let sqs_client = get_sqs_client().await; + let sqs_client = get_sqs_client(services.provider_config.clone()).await; let queue = sqs_client.create_queue().queue_name(SNS_ALERT_TEST_QUEUE).send().await.unwrap(); let queue_url = queue.queue_url().unwrap(); - let sns_client = get_sns_client().await; - let config = config().await; + let sns_client = get_sns_client(services.provider_config.get_aws_client_or_panic()).await; let queue_attributes = sqs_client.get_queue_attributes().queue_url(queue_url).attribute_names(QueueArn).send().await.unwrap(); @@ -39,7 +40,7 @@ async fn sns_alert_subscribe_to_topic_receive_alert_works() { let message_to_send = "Hello World :)"; // Getting sns client from the module - let alerts_client = config.alerts(); + let alerts_client = services.config.alerts(); // Sending the alert message alerts_client.send_alert_message(message_to_send.to_string()).await.unwrap(); diff --git a/crates/orchestrator/src/tests/common/mod.rs b/crates/orchestrator/src/tests/common/mod.rs index 6e183b7d..5c27e96a 100644 --- a/crates/orchestrator/src/tests/common/mod.rs +++ b/crates/orchestrator/src/tests/common/mod.rs @@ -3,9 +3,9 @@ pub mod constants; use std::collections::HashMap; use std::sync::Arc; -use crate::config::{get_aws_config, ProviderConfig}; +use crate::config::ProviderConfig; use ::uuid::Uuid; -use aws_config::Region; +use aws_config::SdkConfig; use aws_sdk_sns::error::SdkError; use aws_sdk_sns::operation::create_topic::CreateTopicError; use chrono::{SubsecRound, Utc}; @@ -46,16 +46,14 @@ pub fn custom_job_item(default_job_item: JobItem, #[default(String::from("0"))] job_item } -pub async fn create_sns_arn() -> Result<(), SdkError> { - let sns_client = get_sns_client().await; +pub async fn create_sns_arn(provider_config: Arc) -> Result<(), SdkError> { + let sns_client = get_sns_client(provider_config.get_aws_client_or_panic()).await; sns_client.create_topic().name(get_env_var_or_panic("AWS_SNS_ARN_NAME")).send().await?; Ok(()) } -pub async fn get_sns_client() -> aws_sdk_sns::client::Client { - let sns_region = get_env_var_or_panic("AWS_SNS_REGION"); - let config = aws_config::from_env().region(Region::new(sns_region)).load().await; - aws_sdk_sns::Client::new(&config) +pub async fn get_sns_client(aws_config: &SdkConfig) -> aws_sdk_sns::client::Client { + aws_sdk_sns::Client::new(aws_config) } pub async fn drop_database() -> color_eyre::Result<()> { @@ -69,8 +67,8 @@ pub async fn drop_database() -> color_eyre::Result<()> { // SQS structs & functions -pub async fn create_sqs_queues() -> color_eyre::Result<()> { - let sqs_client = get_sqs_client().await; +pub async fn create_sqs_queues(provider_config: Arc) -> color_eyre::Result<()> { + let sqs_client = get_sqs_client(provider_config).await; // Dropping sqs queues let list_queues_output = sqs_client.list_queues().send().await?; @@ -89,11 +87,10 @@ pub async fn create_sqs_queues() -> color_eyre::Result<()> { Ok(()) } -pub async fn get_sqs_client() -> aws_sdk_sqs::Client { +pub async fn get_sqs_client(provider_config: Arc) -> aws_sdk_sqs::Client { // This function is for localstack. So we can hardcode the region for this as of now. - let region_provider = Region::new("us-east-1"); - let config = aws_config::from_env().region(region_provider).load().await; - aws_sdk_sqs::Client::new(&config) + let config = provider_config.get_aws_client_or_panic(); + aws_sdk_sqs::Client::new(config) } #[derive(Deserialize, Debug)] @@ -101,12 +98,6 @@ pub struct MessagePayloadType { pub(crate) id: Uuid, } -pub async fn get_storage_client() -> Box { - Box::new( - AWSS3::new_with_settings( - &EnvSettingsProvider {}, - ProviderConfig::AWS(Arc::new(get_aws_config(&EnvSettingsProvider {}).await)), - ) - .await, - ) +pub async fn get_storage_client(provider_config: Arc) -> Box { + Box::new(AWSS3::new_with_settings(&EnvSettingsProvider {}, provider_config).await) } diff --git a/crates/orchestrator/src/tests/config.rs b/crates/orchestrator/src/tests/config.rs index e817474e..10085d1b 100644 --- a/crates/orchestrator/src/tests/config.rs +++ b/crates/orchestrator/src/tests/config.rs @@ -1,49 +1,94 @@ use std::sync::Arc; -use crate::config::{ - build_alert_client, build_da_client, build_prover_service, build_settlement_client, config_force_init, - get_aws_config, Config, ProviderConfig, -}; +use crate::config::{get_aws_config, Config, ProviderConfig}; use crate::data_storage::DataStorage; use da_client_interface::DaClient; use httpmock::MockServer; +use starknet::providers::jsonrpc::HttpTransport; +use starknet::providers::JsonRpcClient; + +use da_client_interface::MockDaClient; +use prover_client_interface::{MockProverClient, ProverClient}; +use settlement_client_interface::{MockSettlementClient, SettlementClient}; use crate::alerts::Alerts; -use prover_client_interface::ProverClient; -use settlement_client_interface::SettlementClient; -use starknet::providers::jsonrpc::HttpTransport; -use starknet::providers::{JsonRpcClient, Url}; -use utils::env_utils::get_env_var_or_panic; +use crate::data_storage::MockDataStorage; +use crate::database::{Database, MockDatabase}; +use crate::queue::{MockQueueProvider, QueueProvider}; +use crate::tests::common::{create_sns_arn, create_sqs_queues, drop_database}; use utils::settings::env::EnvSettingsProvider; -use crate::database::mongodb::MongoDb; -use crate::database::Database; -use crate::queue::sqs::SqsQueue; -use crate::queue::QueueProvider; -use crate::tests::common::{create_sns_arn, create_sqs_queues, drop_database, get_storage_client}; - // Inspiration : https://rust-unofficial.github.io/patterns/patterns/creational/builder.html // TestConfigBuilder allows to heavily customise the global configs based on the test's requirement. // Eg: We want to mock only the da client and leave rest to be as it is, use mock_da_client. +pub enum MockType { + StarknetClient(Arc>), + DaClient(Box), + ProverClient(Box), + SettlementClient(Box), + + Alerts(Box), + Database(Box), + Queue(Box), + Storage(Box), +} + +// By default, everything is on Dummy. +#[derive(Default)] +pub enum ConfigType { + Mock(MockType), + Actual, + #[default] + Dummy, +} + +impl From> for ConfigType { + fn from(client: JsonRpcClient) -> Self { + ConfigType::Mock(MockType::StarknetClient(Arc::new(client))) + } +} + +macro_rules! impl_mock_from { + ($($mock_type:ty => $variant:ident),+) => { + $( + impl From<$mock_type> for ConfigType { + fn from(client: $mock_type) -> Self { + ConfigType::Mock(MockType::$variant(Box::new(client))) + } + } + )+ + }; +} + +impl_mock_from! { + MockProverClient => ProverClient, + MockDatabase => Database, + MockDaClient => DaClient, + MockQueueProvider => Queue, + MockDataStorage => Storage, + MockSettlementClient => SettlementClient +} + // TestBuilder for Config pub struct TestConfigBuilder { /// The starknet client to get data from the node - starknet_client: Option>>, + starknet_client_type: ConfigType, /// The DA client to interact with the DA layer - da_client: Option>, - /// The service that produces proof and registers it onchain - prover_client: Option>, + da_client_type: ConfigType, + /// The service that produces proof and registers it on chain + prover_client_type: ConfigType, /// Settlement client - settlement_client: Option>, + settlement_client_type: ConfigType, + + /// Alerts client + alerts_type: ConfigType, /// The database client - database: Option>, + database_type: ConfigType, /// Queue client - queue: Option>, + queue_type: ConfigType, /// Storage client - storage: Option>, - /// Alerts client - alerts: Option>, + storage_type: ConfigType, } impl Default for TestConfigBuilder { @@ -52,127 +97,278 @@ impl Default for TestConfigBuilder { } } +pub struct TestConfigBuilderReturns { + pub server: Option, + pub config: Arc, + pub provider_config: Arc, +} impl TestConfigBuilder { /// Create a new config pub fn new() -> TestConfigBuilder { TestConfigBuilder { - starknet_client: None, - da_client: None, - prover_client: None, - settlement_client: None, - database: None, - queue: None, - storage: None, - alerts: None, + starknet_client_type: ConfigType::default(), + da_client_type: ConfigType::default(), + prover_client_type: ConfigType::default(), + settlement_client_type: ConfigType::default(), + database_type: ConfigType::default(), + queue_type: ConfigType::default(), + storage_type: ConfigType::default(), + alerts_type: ConfigType::default(), } } - pub fn mock_da_client(mut self, da_client: Box) -> TestConfigBuilder { - self.da_client = Some(da_client); + pub fn configure_da_client(mut self, da_client_type: ConfigType) -> TestConfigBuilder { + self.da_client_type = da_client_type; self } - pub fn mock_db_client(mut self, db_client: Box) -> TestConfigBuilder { - self.database = Some(db_client); + pub fn configure_settlement_client(mut self, settlement_client_type: ConfigType) -> TestConfigBuilder { + self.settlement_client_type = settlement_client_type; self } - pub fn mock_settlement_client(mut self, settlement_client: Box) -> TestConfigBuilder { - self.settlement_client = Some(settlement_client); + pub fn configure_starknet_client(mut self, starknet_client_type: ConfigType) -> TestConfigBuilder { + self.starknet_client_type = starknet_client_type; self } - pub fn mock_starknet_client(mut self, starknet_client: Arc>) -> TestConfigBuilder { - self.starknet_client = Some(starknet_client); + pub fn configure_prover_client(mut self, prover_client_type: ConfigType) -> TestConfigBuilder { + self.prover_client_type = prover_client_type; self } - pub fn mock_prover_client(mut self, prover_client: Box) -> TestConfigBuilder { - self.prover_client = Some(prover_client); + pub fn configure_alerts(mut self, alert_option: ConfigType) -> TestConfigBuilder { + self.alerts_type = alert_option; self } - pub fn mock_storage_client(mut self, storage_client: Box) -> TestConfigBuilder { - self.storage = Some(storage_client); + pub fn configure_storage_client(mut self, storage_client_option: ConfigType) -> TestConfigBuilder { + self.storage_type = storage_client_option; self } - pub fn mock_queue(mut self, queue: Box) -> TestConfigBuilder { - self.queue = Some(queue); + pub fn configure_queue_client(mut self, queue_type: ConfigType) -> TestConfigBuilder { + self.queue_type = queue_type; self } - - pub fn mock_alerts(mut self, alerts: Box) -> TestConfigBuilder { - self.alerts = Some(alerts); + pub fn configure_database(mut self, database_type: ConfigType) -> TestConfigBuilder { + self.database_type = database_type; self } - pub async fn build(mut self) -> MockServer { - dotenvy::from_filename("../.env.test").expect("Failed to load the .env file"); + pub async fn build(self) -> TestConfigBuilderReturns { + dotenvy::from_filename("../.env.test").expect("Failed to load the .env.test file"); - let server = MockServer::start(); let settings_provider = EnvSettingsProvider {}; - let aws_config = get_aws_config(&settings_provider).await; + let provider_config = Arc::new(ProviderConfig::AWS(Box::new(get_aws_config(&settings_provider).await))); + + use std::sync::Arc; + + let TestConfigBuilder { + starknet_client_type, + alerts_type, + da_client_type, + prover_client_type, + settlement_client_type, + database_type, + queue_type, + storage_type, + } = self; + + let (starknet_client, server) = implement_client::init_starknet_client(starknet_client_type).await; + let alerts = implement_client::init_alerts(alerts_type, &settings_provider, provider_config.clone()).await; + let da_client = implement_client::init_da_client(da_client_type, &settings_provider).await; + + let settlement_client = + implement_client::init_settlement_client(settlement_client_type, &settings_provider).await; + + let prover_client = implement_client::init_prover_client(prover_client_type, &settings_provider).await; + + // External Dependencies + let storage = implement_client::init_storage_client(storage_type, provider_config.clone()).await; + let database = implement_client::init_database(database_type, settings_provider).await; + let queue = implement_client::init_queue_client(queue_type).await; + // Deleting and Creating the queues in sqs. + create_sqs_queues(provider_config.clone()).await.expect("Not able to delete and create the queues."); + // Deleting the database + drop_database().await.expect("Unable to drop the database."); + // Creating the SNS ARN + create_sns_arn(provider_config.clone()).await.expect("Unable to create the sns arn"); - // init database - if self.database.is_none() { - self.database = Some(Box::new(MongoDb::new_with_settings(&settings_provider).await)); + let config = Arc::new(Config::new( + starknet_client, + da_client, + prover_client, + settlement_client, + database, + queue, + storage, + alerts, + )); + + TestConfigBuilderReturns { server, config, provider_config: provider_config.clone() } + } +} + +pub mod implement_client { + use std::sync::Arc; + + use crate::alerts::{Alerts, MockAlerts}; + use crate::config::{ + build_alert_client, build_da_client, build_database_client, build_prover_service, build_queue_client, + build_settlement_client, ProviderConfig, + }; + use crate::data_storage::{DataStorage, MockDataStorage}; + use crate::database::{Database, MockDatabase}; + use crate::queue::{MockQueueProvider, QueueProvider}; + use crate::tests::common::get_storage_client; + use da_client_interface::{DaClient, MockDaClient}; + use httpmock::MockServer; + use prover_client_interface::{MockProverClient, ProverClient}; + use settlement_client_interface::{MockSettlementClient, SettlementClient}; + use starknet::providers::jsonrpc::HttpTransport; + use starknet::providers::{JsonRpcClient, Url}; + use utils::env_utils::get_env_var_or_panic; + use utils::settings::env::EnvSettingsProvider; + use utils::settings::Settings; + + use super::ConfigType; + use super::MockType; + + macro_rules! implement_mock_client_conversion { + ($client_type:ident, $mock_variant:ident) => { + impl From for Box { + fn from(client: MockType) -> Self { + if let MockType::$mock_variant(service_client) = client { + service_client + } else { + panic!(concat!("Mock client is not a ", stringify!($client_type))); + } + } + } + }; + } + + implement_mock_client_conversion!(DataStorage, Storage); + implement_mock_client_conversion!(QueueProvider, Queue); + implement_mock_client_conversion!(Database, Database); + implement_mock_client_conversion!(Alerts, Alerts); + implement_mock_client_conversion!(ProverClient, ProverClient); + implement_mock_client_conversion!(SettlementClient, SettlementClient); + implement_mock_client_conversion!(DaClient, DaClient); + + pub(crate) async fn init_da_client(service: ConfigType, settings_provider: &impl Settings) -> Box { + match service { + ConfigType::Mock(client) => client.into(), + ConfigType::Actual => build_da_client(settings_provider).await, + ConfigType::Dummy => Box::new(MockDaClient::new()), + } + } + + pub(crate) async fn init_settlement_client( + service: ConfigType, + settings_provider: &impl Settings, + ) -> Box { + match service { + ConfigType::Mock(client) => client.into(), + ConfigType::Actual => build_settlement_client(settings_provider).await, + ConfigType::Dummy => Box::new(MockSettlementClient::new()), } + } - // init the DA client - if self.da_client.is_none() { - self.da_client = Some(build_da_client(&settings_provider).await); + pub(crate) async fn init_prover_client( + service: ConfigType, + settings_provider: &impl Settings, + ) -> Box { + match service { + ConfigType::Mock(client) => client.into(), + ConfigType::Actual => build_prover_service(settings_provider), + ConfigType::Dummy => Box::new(MockProverClient::new()), } + } - // init the Settings client - if self.settlement_client.is_none() { - self.settlement_client = Some(build_settlement_client(&settings_provider).await); + pub(crate) async fn init_alerts( + service: ConfigType, + settings_provider: &impl Settings, + provider_config: Arc, + ) -> Box { + match service { + ConfigType::Mock(client) => client.into(), + ConfigType::Actual => build_alert_client(settings_provider, provider_config).await, + ConfigType::Dummy => Box::new(MockAlerts::new()), } + } - // init the storage client - if self.storage.is_none() { - self.storage = Some(get_storage_client().await); - match get_env_var_or_panic("DATA_STORAGE").as_str() { - "s3" => self - .storage - .as_ref() - .unwrap() - .build_test_bucket(&get_env_var_or_panic("AWS_S3_BUCKET_NAME")) - .await - .unwrap(), - _ => panic!("Unsupported Storage Client"), + pub(crate) async fn init_storage_client( + service: ConfigType, + provider_config: Arc, + ) -> Box { + match service { + ConfigType::Mock(client) => client.into(), + ConfigType::Actual => { + let storage = get_storage_client(provider_config).await; + match get_env_var_or_panic("DATA_STORAGE").as_str() { + "s3" => { + storage.as_ref().build_test_bucket(&get_env_var_or_panic("AWS_S3_BUCKET_NAME")).await.unwrap() + } + _ => panic!("Unsupported Storage Client"), + } + storage } + ConfigType::Dummy => Box::new(MockDataStorage::new()), } + } - if self.alerts.is_none() { - self.alerts = Some(build_alert_client(&settings_provider, ProviderConfig::AWS(Arc::new(aws_config))).await); + pub(crate) async fn init_queue_client(service: ConfigType) -> Box { + match service { + ConfigType::Mock(client) => client.into(), + ConfigType::Actual => build_queue_client(), + ConfigType::Dummy => Box::new(MockQueueProvider::new()), } + } - // Deleting and Creating the queues in sqs. - create_sqs_queues().await.expect("Not able to delete and create the queues."); - // Deleting the database - drop_database().await.expect("Unable to drop the database."); - // Creating the SNS ARN - create_sns_arn().await.expect("Unable to create the sns arn"); - - let config = Config::new( - self.starknet_client.unwrap_or_else(|| { - let provider = JsonRpcClient::new(HttpTransport::new( - Url::parse(format!("http://localhost:{}", server.port()).as_str()).expect("Failed to parse URL"), - )); - Arc::new(provider) - }), - self.da_client.unwrap(), - self.prover_client.unwrap_or_else(|| build_prover_service(&settings_provider)), - self.settlement_client.unwrap(), - self.database.unwrap(), - self.queue.unwrap_or_else(|| Box::new(SqsQueue {})), - self.storage.unwrap(), - self.alerts.unwrap(), - ); - - config_force_init(config).await; - - server + pub(crate) async fn init_database( + service: ConfigType, + settings_provider: EnvSettingsProvider, + ) -> Box { + match service { + ConfigType::Mock(client) => client.into(), + ConfigType::Actual => build_database_client(&settings_provider).await, + ConfigType::Dummy => Box::new(MockDatabase::new()), + } + } + + pub(crate) async fn init_starknet_client( + service: ConfigType, + ) -> (Arc>, Option) { + fn get_provider() -> (Arc>, Option) { + let server = MockServer::start(); + let port = server.port(); + let service = Arc::new(JsonRpcClient::new(HttpTransport::new( + Url::parse(format!("http://localhost:{}", port).as_str()).expect("Failed to parse URL"), + ))); + (service, Some(server)) + } + + fn get_dummy_provider() -> (Arc>, Option) { + // Assigning a random port number since this mock will be never used. + let port: u16 = 3000; + let service = Arc::new(JsonRpcClient::new(HttpTransport::new( + Url::parse(format!("http://localhost:{}", port).as_str()).expect("Failed to parse URL"), + ))); + (service, None) + } + + match service { + ConfigType::Mock(client) => { + if let MockType::StarknetClient(starknet_client) = client { + (starknet_client, None) + } else { + panic!("Mock client is not a Starknet Client"); + } + } + ConfigType::Actual => get_provider(), + ConfigType::Dummy => get_dummy_provider(), + } } } diff --git a/crates/orchestrator/src/tests/data_storage/mod.rs b/crates/orchestrator/src/tests/data_storage/mod.rs index b9890f36..a041d18f 100644 --- a/crates/orchestrator/src/tests/data_storage/mod.rs +++ b/crates/orchestrator/src/tests/data_storage/mod.rs @@ -1,12 +1,8 @@ -use crate::config::{get_aws_config, ProviderConfig}; -use crate::data_storage::aws_s3::AWSS3; -use crate::data_storage::DataStorage; use bytes::Bytes; -use rstest::rstest; use serde_json::json; -use std::sync::Arc; -use utils::env_utils::get_env_var_or_panic; -use utils::settings::env::EnvSettingsProvider; + +use crate::tests::config::{ConfigType, TestConfigBuilder}; +use rstest::rstest; /// This test checks the ability to put and get data from AWS S3 using `AWSS3`. /// It puts JSON data into a test bucket and retrieves it, verifying the data @@ -15,13 +11,11 @@ use utils::settings::env::EnvSettingsProvider; #[rstest] #[tokio::test] async fn test_put_and_get_data_s3() -> color_eyre::Result<()> { + let services = TestConfigBuilder::new().configure_storage_client(ConfigType::Actual).build().await; + dotenvy::from_filename("../.env.test")?; - let s3_client = AWSS3::new_with_settings( - &EnvSettingsProvider {}, - ProviderConfig::AWS(Arc::new(get_aws_config(&EnvSettingsProvider {}).await)), - ) - .await; - s3_client.build_test_bucket(&get_env_var_or_panic("AWS_S3_BUCKET_NAME")).await.unwrap(); + + let s3_client = services.config.storage(); let mock_data = json!( { diff --git a/crates/orchestrator/src/tests/database/mod.rs b/crates/orchestrator/src/tests/database/mod.rs index 29c3a9e7..52ba9620 100644 --- a/crates/orchestrator/src/tests/database/mod.rs +++ b/crates/orchestrator/src/tests/database/mod.rs @@ -1,31 +1,24 @@ -use crate::config::{config, Config}; use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType}; -use crate::tests::config::TestConfigBuilder; -use arc_swap::Guard; use chrono::{SubsecRound, Utc}; use rstest::*; -use std::sync::Arc; use uuid::Uuid; +use crate::tests::config::{ConfigType, TestConfigBuilder}; + #[rstest] #[tokio::test] async fn test_database_connection() -> color_eyre::Result<()> { - TestConfigBuilder::new().build().await; + let _services = TestConfigBuilder::new().build().await; Ok(()) } -#[fixture] -async fn get_config() -> Guard> { - config().await -} - /// Tests for `create_job` operation in database trait. /// Creates 3 jobs and asserts them. #[rstest] #[tokio::test] -async fn database_create_job_works(#[future] get_config: Guard>) { - TestConfigBuilder::new().build().await; - let config = get_config.await; +async fn database_create_job_works() { + let services = TestConfigBuilder::new().configure_database(ConfigType::Actual).build().await; + let config = services.config; let database_client = config.database(); let job_vec = [ @@ -62,12 +55,9 @@ async fn database_create_job_works(#[future] get_config: Guard>) { #[case(true)] #[case(false)] #[tokio::test] -async fn database_get_jobs_without_successor_works( - #[future] get_config: Guard>, - #[case] is_successor: bool, -) { - TestConfigBuilder::new().build().await; - let config = get_config.await; +async fn database_get_jobs_without_successor_works(#[case] is_successor: bool) { + let services = TestConfigBuilder::new().configure_database(ConfigType::Actual).build().await; + let config = services.config; let database_client = config.database(); let job_vec = [ @@ -109,9 +99,9 @@ async fn database_get_jobs_without_successor_works( /// - Should return the last successful job #[rstest] #[tokio::test] -async fn database_get_last_successful_job_by_type_works(#[future] get_config: Guard>) { - TestConfigBuilder::new().build().await; - let config = get_config.await; +async fn database_get_last_successful_job_by_type_works() { + let services = TestConfigBuilder::new().configure_database(ConfigType::Actual).build().await; + let config = services.config; let database_client = config.database(); let job_vec = [ @@ -137,9 +127,9 @@ async fn database_get_last_successful_job_by_type_works(#[future] get_config: Gu /// - Should return the jobs after internal id #[rstest] #[tokio::test] -async fn database_get_jobs_after_internal_id_by_job_type_works(#[future] get_config: Guard>) { - TestConfigBuilder::new().build().await; - let config = get_config.await; +async fn database_get_jobs_after_internal_id_by_job_type_works() { + let services = TestConfigBuilder::new().configure_database(ConfigType::Actual).build().await; + let config = services.config; let database_client = config.database(); let job_vec = [ @@ -172,9 +162,10 @@ async fn database_get_jobs_after_internal_id_by_job_type_works(#[future] get_con /// Happy Case : Creating a job with version 0 and updating the job with version 0 update only. #[rstest] #[tokio::test] -async fn database_update_job_status_passing_case_works(#[future] get_config: Guard>) { - TestConfigBuilder::new().build().await; - let config = get_config.await; +async fn database_update_job_status_passing_case_works() { + let services = TestConfigBuilder::new().configure_database(ConfigType::Actual).build().await; + let config = services.config; + let database_client = config.database(); let job = build_job_item(JobType::SnosRun, JobStatus::Created, 1); @@ -190,9 +181,9 @@ async fn database_update_job_status_passing_case_works(#[future] get_config: Gua /// Failing Case : Creating a job with version 1 and updating the job with version 0 update only. #[rstest] #[tokio::test] -async fn database_update_job_status_failing_case_works(#[future] get_config: Guard>) { - TestConfigBuilder::new().build().await; - let config = get_config.await; +async fn database_update_job_status_failing_case_works() { + let services = TestConfigBuilder::new().configure_database(ConfigType::Actual).build().await; + let config = services.config; let database_client = config.database(); // Scenario : diff --git a/crates/orchestrator/src/tests/jobs/da_job/mod.rs b/crates/orchestrator/src/tests/jobs/da_job/mod.rs index d98fe6de..c5c2259b 100644 --- a/crates/orchestrator/src/tests/jobs/da_job/mod.rs +++ b/crates/orchestrator/src/tests/jobs/da_job/mod.rs @@ -1,21 +1,24 @@ -use crate::jobs::da_job::test::{get_nonce_attached, read_state_update_from_file}; -use crate::jobs::da_job::{DaError, DaJob}; -use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType}; -use crate::jobs::JobError; -use crate::tests::common::drop_database; -use crate::tests::config::TestConfigBuilder; -use crate::{config::config, jobs::Job}; +use std::collections::HashMap; + use assert_matches::assert_matches; use chrono::{SubsecRound, Utc}; use color_eyre::eyre::eyre; -use da_client_interface::MockDaClient; use mockall::predicate::always; use rstest::rstest; use serde_json::json; use starknet_core::types::{FieldElement, MaybePendingStateUpdate, PendingStateUpdate, StateDiff}; -use std::collections::HashMap; use uuid::Uuid; +use da_client_interface::MockDaClient; + +use crate::jobs::da_job::test::{get_nonce_attached, read_state_update_from_file}; +use crate::jobs::da_job::{DaError, DaJob}; +use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType}; +use crate::jobs::Job; +use crate::jobs::JobError; +use crate::tests::common::drop_database; +use crate::tests::config::{ConfigType, TestConfigBuilder}; + /// Tests the DA Job's handling of a blob length exceeding the supported size. /// It mocks the DA client to simulate the environment and expects an error on job processing. /// Validates the error message for exceeding blob limits against the expected output. @@ -40,15 +43,19 @@ async fn test_da_job_process_job_failure_on_small_blob_size( da_client.expect_max_blob_per_txn().with().returning(|| 1); da_client.expect_max_bytes_per_blob().with().returning(|| 1200); - let server = TestConfigBuilder::new().mock_da_client(Box::new(da_client)).build().await; - let config = config().await; + let services = TestConfigBuilder::new() + .configure_starknet_client(ConfigType::Actual) + .configure_storage_client(ConfigType::Actual) + .configure_da_client(da_client.into()) + .build() + .await; let state_update = read_state_update_from_file(state_update_file.as_str()).expect("issue while reading"); let state_update = MaybePendingStateUpdate::Update(state_update); let state_update = serde_json::to_value(&state_update).unwrap(); let response = json!({ "id": 640641,"jsonrpc":"2.0","result": state_update }); - + let server = services.server.unwrap(); get_nonce_attached(&server, nonces_file.as_str()); let state_update_mock = server.mock(|when, then| { @@ -56,11 +63,11 @@ async fn test_da_job_process_job_failure_on_small_blob_size( then.status(200).body(serde_json::to_vec(&response).unwrap()); }); - let max_blob_per_txn = config.da_client().max_blob_per_txn().await; + let max_blob_per_txn = services.config.da_client().max_blob_per_txn().await; let response = DaJob .process_job( - config.as_ref(), + services.config, &mut JobItem { id: Uuid::default(), internal_id: internal_id.to_string(), @@ -84,7 +91,7 @@ async fn test_da_job_process_job_failure_on_small_blob_size( ); state_update_mock.assert(); - let _ = drop_database().await; + // let _ = drop_database().await; } /// Tests DA Job processing failure when a block is in pending state. @@ -94,8 +101,12 @@ async fn test_da_job_process_job_failure_on_small_blob_size( #[rstest] #[tokio::test] async fn test_da_job_process_job_failure_on_pending_block() { - let server = TestConfigBuilder::new().build().await; - let config = config().await; + let services = TestConfigBuilder::new() + .configure_starknet_client(ConfigType::Actual) + .configure_da_client(ConfigType::Actual) + .build() + .await; + let server = services.server.unwrap(); let internal_id = "1"; let pending_state_update = MaybePendingStateUpdate::PendingUpdate(PendingStateUpdate { @@ -120,7 +131,7 @@ async fn test_da_job_process_job_failure_on_pending_block() { let response = DaJob .process_job( - config.as_ref(), + services.config, &mut JobItem { id: Uuid::default(), internal_id: internal_id.to_string(), @@ -181,8 +192,13 @@ async fn test_da_job_process_job_success( da_client.expect_max_blob_per_txn().with().returning(|| 6); da_client.expect_max_bytes_per_blob().with().returning(|| 131072); - let server = TestConfigBuilder::new().mock_da_client(Box::new(da_client)).build().await; - let config = config().await; + let services = TestConfigBuilder::new() + .configure_starknet_client(ConfigType::Actual) + .configure_storage_client(ConfigType::Actual) + .configure_da_client(da_client.into()) + .build() + .await; + let server = services.server.unwrap(); let state_update = read_state_update_from_file(state_update_file.as_str()).expect("issue while reading"); @@ -198,7 +214,7 @@ async fn test_da_job_process_job_success( let response = DaJob .process_job( - config.as_ref(), + services.config, &mut JobItem { id: Uuid::default(), internal_id: internal_id.to_string(), diff --git a/crates/orchestrator/src/tests/jobs/mod.rs b/crates/orchestrator/src/tests/jobs/mod.rs index f39851d7..fd457be1 100644 --- a/crates/orchestrator/src/tests/jobs/mod.rs +++ b/crates/orchestrator/src/tests/jobs/mod.rs @@ -1,8 +1,23 @@ +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Duration; + +use mockall::predicate::eq; +use mongodb::bson::doc; +use omniqueue::QueueError; use rstest::rstest; +use tokio::time::sleep; +use uuid::Uuid; -use crate::config::config; +use crate::jobs::constants::{JOB_PROCESS_ATTEMPT_METADATA_KEY, JOB_VERIFICATION_ATTEMPT_METADATA_KEY}; use crate::jobs::handle_job_failure; +use crate::jobs::job_handler_factory::mock_factory; use crate::jobs::types::JobType; +use crate::jobs::types::{ExternalId, JobItem, JobVerificationStatus}; +use crate::jobs::{create_job, increment_key_in_metadata, process_job, verify_job, Job, MockJob}; +use crate::queue::job_queue::{JOB_PROCESSING_QUEUE, JOB_VERIFICATION_QUEUE}; +use crate::tests::common::MessagePayloadType; +use crate::tests::config::ConfigType; use crate::{jobs::types::JobStatus, tests::config::TestConfigBuilder}; use super::database::build_job_item; @@ -18,22 +33,6 @@ pub mod state_update_job; use assert_matches::assert_matches; use chrono::{SubsecRound, Utc}; -use std::collections::HashMap; -use std::sync::Arc; -use std::time::Duration; - -use mockall::predicate::eq; -use mongodb::bson::doc; -use omniqueue::QueueError; -use tokio::time::sleep; -use uuid::Uuid; - -use crate::jobs::constants::{JOB_PROCESS_ATTEMPT_METADATA_KEY, JOB_VERIFICATION_ATTEMPT_METADATA_KEY}; -use crate::jobs::job_handler_factory::mock_factory; -use crate::jobs::types::{ExternalId, JobItem, JobVerificationStatus}; -use crate::jobs::{create_job, increment_key_in_metadata, process_job, verify_job, Job, MockJob}; -use crate::queue::job_queue::{JOB_PROCESSING_QUEUE, JOB_VERIFICATION_QUEUE}; -use crate::tests::common::MessagePayloadType; /// Tests `create_job` function when job is not existing in the db. #[rstest] @@ -46,22 +45,25 @@ async fn create_job_job_does_not_exists_in_db_works() { let job_item_clone = job_item.clone(); job_handler.expect_create_job().times(1).returning(move |_, _, _| Ok(job_item_clone.clone())); - TestConfigBuilder::new().build().await; - let config = config().await; + let services = TestConfigBuilder::new() + .configure_database(ConfigType::Actual) + .configure_queue_client(ConfigType::Actual) + .build() + .await; // Mocking the `get_job_handler` call in create_job function. let job_handler: Arc> = Arc::new(Box::new(job_handler)); let ctx = mock_factory::get_job_handler_context(); ctx.expect().times(1).with(eq(JobType::SnosRun)).return_once(move |_| Arc::clone(&job_handler)); - assert!(create_job(JobType::SnosRun, "0".to_string(), HashMap::new()).await.is_ok()); + assert!(create_job(JobType::SnosRun, "0".to_string(), HashMap::new(), services.config.clone()).await.is_ok()); let mut hashmap: HashMap = HashMap::new(); hashmap.insert(JOB_PROCESS_ATTEMPT_METADATA_KEY.to_string(), "0".to_string()); hashmap.insert(JOB_VERIFICATION_ATTEMPT_METADATA_KEY.to_string(), "0".to_string()); // Db checks. - let job_in_db = config.database().get_job_by_id(job_item.id).await.unwrap().unwrap(); + let job_in_db = services.config.database().get_job_by_id(job_item.id).await.unwrap().unwrap(); assert_eq!(job_in_db.id, job_item.id); assert_eq!(job_in_db.internal_id, job_item.internal_id); assert_eq!(job_in_db.metadata, hashmap); @@ -70,7 +72,8 @@ async fn create_job_job_does_not_exists_in_db_works() { sleep(Duration::from_secs(5)).await; // Queue checks. - let consumed_messages = config.queue().consume_message_from_queue(JOB_PROCESSING_QUEUE.to_string()).await.unwrap(); + let consumed_messages = + services.config.queue().consume_message_from_queue(JOB_PROCESSING_QUEUE.to_string()).await.unwrap(); let consumed_message_payload: MessagePayloadType = consumed_messages.payload_serde_json().unwrap().unwrap(); assert_eq!(consumed_message_payload.id, job_item.id); } @@ -81,20 +84,25 @@ async fn create_job_job_does_not_exists_in_db_works() { async fn create_job_job_exists_in_db_works() { let job_item = build_job_item_by_type_and_status(JobType::ProofCreation, JobStatus::Created, "0".to_string()); - TestConfigBuilder::new().build().await; + let services = TestConfigBuilder::new() + .configure_database(ConfigType::Actual) + .configure_queue_client(ConfigType::Actual) + .build() + .await; - let config = config().await; - let database_client = config.database(); + let database_client = services.config.database(); database_client.create_job(job_item).await.unwrap(); - assert!(create_job(JobType::ProofCreation, "0".to_string(), HashMap::new()).await.is_err()); + assert!(create_job(JobType::ProofCreation, "0".to_string(), HashMap::new(), services.config.clone()) + .await + .is_err()); // Waiting for 5 secs for message to be passed into the queue sleep(Duration::from_secs(5)).await; // Queue checks. let consumed_messages = - config.queue().consume_message_from_queue(JOB_PROCESSING_QUEUE.to_string()).await.unwrap_err(); + services.config.queue().consume_message_from_queue(JOB_PROCESSING_QUEUE.to_string()).await.unwrap_err(); assert_matches!(consumed_messages, QueueError::NoData); } @@ -104,21 +112,26 @@ async fn create_job_job_exists_in_db_works() { #[should_panic(expected = "Job type not implemented yet.")] #[tokio::test] async fn create_job_job_handler_is_not_implemented_panics() { - TestConfigBuilder::new().build().await; - let config = config().await; + let services = TestConfigBuilder::new() + .configure_database(ConfigType::Actual) + .configure_queue_client(ConfigType::Actual) + .build() + .await; // Mocking the `get_job_handler` call in create_job function. let ctx = mock_factory::get_job_handler_context(); ctx.expect().times(1).returning(|_| panic!("Job type not implemented yet.")); - assert!(create_job(JobType::ProofCreation, "0".to_string(), HashMap::new()).await.is_err()); + assert!(create_job(JobType::ProofCreation, "0".to_string(), HashMap::new(), services.config.clone()) + .await + .is_err()); // Waiting for 5 secs for message to be passed into the queue sleep(Duration::from_secs(5)).await; // Queue checks. let consumed_messages = - config.queue().consume_message_from_queue(JOB_PROCESSING_QUEUE.to_string()).await.unwrap_err(); + services.config.queue().consume_message_from_queue(JOB_PROCESSING_QUEUE.to_string()).await.unwrap_err(); assert_matches!(consumed_messages, QueueError::NoData); } @@ -135,9 +148,12 @@ async fn process_job_with_job_exists_in_db_and_valid_job_processing_status_works let job_item = build_job_item_by_type_and_status(job_type.clone(), job_status.clone(), "1".to_string()); // Building config - TestConfigBuilder::new().build().await; - let config = config().await; - let database_client = config.database(); + let services = TestConfigBuilder::new() + .configure_database(ConfigType::Actual) + .configure_queue_client(ConfigType::Actual) + .build() + .await; + let database_client = services.config.database(); let mut job_handler = MockJob::new(); @@ -152,7 +168,7 @@ async fn process_job_with_job_exists_in_db_and_valid_job_processing_status_works let ctx = mock_factory::get_job_handler_context(); ctx.expect().times(1).with(eq(job_type.clone())).returning(move |_| Arc::clone(&job_handler)); - assert!(process_job(job_item.id).await.is_ok()); + assert!(process_job(job_item.id, services.config.clone()).await.is_ok()); // Getting the updated job. let updated_job = database_client.get_job_by_id(job_item.id).await.unwrap().unwrap(); // checking if job_status is updated in db @@ -165,7 +181,7 @@ async fn process_job_with_job_exists_in_db_and_valid_job_processing_status_works // Queue checks let consumed_messages = - config.queue().consume_message_from_queue(JOB_VERIFICATION_QUEUE.to_string()).await.unwrap(); + services.config.queue().consume_message_from_queue(JOB_VERIFICATION_QUEUE.to_string()).await.unwrap(); let consumed_message_payload: MessagePayloadType = consumed_messages.payload_serde_json().unwrap().unwrap(); assert_eq!(consumed_message_payload.id, job_item.id); } @@ -179,14 +195,17 @@ async fn process_job_with_job_exists_in_db_with_invalid_job_processing_status_er let job_item = build_job_item_by_type_and_status(JobType::SnosRun, JobStatus::Completed, "1".to_string()); // building config - TestConfigBuilder::new().build().await; - let config = config().await; - let database_client = config.database(); + let services = TestConfigBuilder::new() + .configure_database(ConfigType::Actual) + .configure_queue_client(ConfigType::Actual) + .build() + .await; + let database_client = services.config.database(); // creating job in database database_client.create_job(job_item.clone()).await.unwrap(); - assert!(process_job(job_item.id).await.is_err()); + assert!(process_job(job_item.id, services.config.clone()).await.is_err()); let job_in_db = database_client.get_job_by_id(job_item.id).await.unwrap().unwrap(); // Job should be untouched in db. @@ -197,7 +216,7 @@ async fn process_job_with_job_exists_in_db_with_invalid_job_processing_status_er // Queue checks. let consumed_messages = - config.queue().consume_message_from_queue(JOB_VERIFICATION_QUEUE.to_string()).await.unwrap_err(); + services.config.queue().consume_message_from_queue(JOB_VERIFICATION_QUEUE.to_string()).await.unwrap_err(); assert_matches!(consumed_messages, QueueError::NoData); } @@ -210,17 +229,20 @@ async fn process_job_job_does_not_exists_in_db_works() { let job_item = build_job_item_by_type_and_status(JobType::SnosRun, JobStatus::Created, "1".to_string()); // building config - TestConfigBuilder::new().build().await; - let config = config().await; + let services = TestConfigBuilder::new() + .configure_database(ConfigType::Actual) + .configure_queue_client(ConfigType::Actual) + .build() + .await; - assert!(process_job(job_item.id).await.is_err()); + assert!(process_job(job_item.id, services.config.clone()).await.is_err()); // Waiting for 5 secs for message to be passed into the queue sleep(Duration::from_secs(5)).await; // Queue checks. let consumed_messages = - config.queue().consume_message_from_queue(JOB_VERIFICATION_QUEUE.to_string()).await.unwrap_err(); + services.config.queue().consume_message_from_queue(JOB_VERIFICATION_QUEUE.to_string()).await.unwrap_err(); assert_matches!(consumed_messages, QueueError::NoData); } @@ -242,18 +264,24 @@ async fn process_job_two_workers_process_same_job_works() { ctx.expect().times(1).with(eq(JobType::SnosRun)).returning(move |_| Arc::clone(&job_handler)); // building config - TestConfigBuilder::new().build().await; - let config = config().await; - let db_client = config.database(); + let services = TestConfigBuilder::new() + .configure_database(ConfigType::Actual) + .configure_queue_client(ConfigType::Actual) + .build() + .await; + let db_client = services.config.database(); let job_item = build_job_item_by_type_and_status(JobType::SnosRun, JobStatus::Created, "1".to_string()); // Creating the job in the db db_client.create_job(job_item.clone()).await.unwrap(); - // Simulating the two workers - let worker_1 = tokio::spawn(async move { process_job(job_item.id).await }); - let worker_2 = tokio::spawn(async move { process_job(job_item.id).await }); + let config_1 = services.config.clone(); + let config_2 = services.config.clone(); + + // Simulating the two workers, Uuid has in-built copy trait + let worker_1 = tokio::spawn(async move { process_job(job_item.id, config_1).await }); + let worker_2 = tokio::spawn(async move { process_job(job_item.id, config_2).await }); // waiting for workers to complete the processing let (result_1, result_2) = tokio::join!(worker_1, worker_2); @@ -280,10 +308,13 @@ async fn verify_job_with_verified_status_works() { build_job_item_by_type_and_status(JobType::DataSubmission, JobStatus::PendingVerification, "1".to_string()); // building config - TestConfigBuilder::new().build().await; + let services = TestConfigBuilder::new() + .configure_database(ConfigType::Actual) + .configure_queue_client(ConfigType::Actual) + .build() + .await; - let config = config().await; - let database_client = config.database(); + let database_client = services.config.database(); let mut job_handler = MockJob::new(); // creating job in database @@ -297,7 +328,7 @@ async fn verify_job_with_verified_status_works() { // Mocking the `get_job_handler` call in create_job function. ctx.expect().times(1).with(eq(JobType::DataSubmission)).returning(move |_| Arc::clone(&job_handler)); - assert!(verify_job(job_item.id).await.is_ok()); + assert!(verify_job(job_item.id, services.config.clone()).await.is_ok()); // DB checks. let updated_job = database_client.get_job_by_id(job_item.id).await.unwrap().unwrap(); @@ -308,10 +339,10 @@ async fn verify_job_with_verified_status_works() { // Queue checks. let consumed_messages_verification_queue = - config.queue().consume_message_from_queue(JOB_VERIFICATION_QUEUE.to_string()).await.unwrap_err(); + services.config.queue().consume_message_from_queue(JOB_VERIFICATION_QUEUE.to_string()).await.unwrap_err(); assert_matches!(consumed_messages_verification_queue, QueueError::NoData); let consumed_messages_processing_queue = - config.queue().consume_message_from_queue(JOB_PROCESSING_QUEUE.to_string()).await.unwrap_err(); + services.config.queue().consume_message_from_queue(JOB_PROCESSING_QUEUE.to_string()).await.unwrap_err(); assert_matches!(consumed_messages_processing_queue, QueueError::NoData); } @@ -324,10 +355,13 @@ async fn verify_job_with_rejected_status_adds_to_queue_works() { build_job_item_by_type_and_status(JobType::DataSubmission, JobStatus::PendingVerification, "1".to_string()); // building config - TestConfigBuilder::new().build().await; + let services = TestConfigBuilder::new() + .configure_database(ConfigType::Actual) + .configure_queue_client(ConfigType::Actual) + .build() + .await; - let config = config().await; - let database_client = config.database(); + let database_client = services.config.database(); let mut job_handler = MockJob::new(); // creating job in database @@ -340,7 +374,7 @@ async fn verify_job_with_rejected_status_adds_to_queue_works() { // Mocking the `get_job_handler` call in create_job function. ctx.expect().times(1).with(eq(JobType::DataSubmission)).returning(move |_| Arc::clone(&job_handler)); - assert!(verify_job(job_item.id).await.is_ok()); + assert!(verify_job(job_item.id, services.config.clone()).await.is_ok()); // DB checks. let updated_job = database_client.get_job_by_id(job_item.id).await.unwrap().unwrap(); @@ -350,7 +384,8 @@ async fn verify_job_with_rejected_status_adds_to_queue_works() { sleep(Duration::from_secs(5)).await; // Queue checks. - let consumed_messages = config.queue().consume_message_from_queue(JOB_PROCESSING_QUEUE.to_string()).await.unwrap(); + let consumed_messages = + services.config.queue().consume_message_from_queue(JOB_PROCESSING_QUEUE.to_string()).await.unwrap(); let consumed_message_payload: MessagePayloadType = consumed_messages.payload_serde_json().unwrap().unwrap(); assert_eq!(consumed_message_payload.id, job_item.id); } @@ -369,10 +404,13 @@ async fn verify_job_with_rejected_status_works() { job_item.metadata = metadata; // building config - TestConfigBuilder::new().build().await; + let services = TestConfigBuilder::new() + .configure_database(ConfigType::Actual) + .configure_queue_client(ConfigType::Actual) + .build() + .await; - let config = config().await; - let database_client = config.database(); + let database_client = services.config.database(); let mut job_handler = MockJob::new(); // creating job in database @@ -386,7 +424,7 @@ async fn verify_job_with_rejected_status_works() { // Mocking the `get_job_handler` call in create_job function. ctx.expect().times(1).with(eq(JobType::DataSubmission)).returning(move |_| Arc::clone(&job_handler)); - assert!(verify_job(job_item.id).await.is_ok()); + assert!(verify_job(job_item.id, services.config.clone()).await.is_ok()); // DB checks. let updated_job = database_client.get_job_by_id(job_item.id).await.unwrap().unwrap(); @@ -398,7 +436,7 @@ async fn verify_job_with_rejected_status_works() { // Queue checks. let consumed_messages_processing_queue = - config.queue().consume_message_from_queue(JOB_PROCESSING_QUEUE.to_string()).await.unwrap_err(); + services.config.queue().consume_message_from_queue(JOB_PROCESSING_QUEUE.to_string()).await.unwrap_err(); assert_matches!(consumed_messages_processing_queue, QueueError::NoData); } @@ -411,10 +449,13 @@ async fn verify_job_with_pending_status_adds_to_queue_works() { build_job_item_by_type_and_status(JobType::DataSubmission, JobStatus::PendingVerification, "1".to_string()); // building config - TestConfigBuilder::new().build().await; + let services = TestConfigBuilder::new() + .configure_database(ConfigType::Actual) + .configure_queue_client(ConfigType::Actual) + .build() + .await; - let config = config().await; - let database_client = config.database(); + let database_client = services.config.database(); let mut job_handler = MockJob::new(); // creating job in database @@ -429,7 +470,7 @@ async fn verify_job_with_pending_status_adds_to_queue_works() { // Mocking the `get_job_handler` call in create_job function. ctx.expect().times(1).with(eq(JobType::DataSubmission)).returning(move |_| Arc::clone(&job_handler)); - assert!(verify_job(job_item.id).await.is_ok()); + assert!(verify_job(job_item.id, services.config.clone()).await.is_ok()); // DB checks. let updated_job = database_client.get_job_by_id(job_item.id).await.unwrap().unwrap(); @@ -441,7 +482,7 @@ async fn verify_job_with_pending_status_adds_to_queue_works() { // Queue checks let consumed_messages = - config.queue().consume_message_from_queue(JOB_VERIFICATION_QUEUE.to_string()).await.unwrap(); + services.config.queue().consume_message_from_queue(JOB_VERIFICATION_QUEUE.to_string()).await.unwrap(); let consumed_message_payload: MessagePayloadType = consumed_messages.payload_serde_json().unwrap().unwrap(); assert_eq!(consumed_message_payload.id, job_item.id); } @@ -460,10 +501,13 @@ async fn verify_job_with_pending_status_works() { job_item.metadata = metadata; // building config - TestConfigBuilder::new().build().await; + let services = TestConfigBuilder::new() + .configure_database(ConfigType::Actual) + .configure_queue_client(ConfigType::Actual) + .build() + .await; - let config = config().await; - let database_client = config.database(); + let database_client = services.config.database(); let mut job_handler = MockJob::new(); // creating job in database @@ -478,7 +522,7 @@ async fn verify_job_with_pending_status_works() { // Mocking the `get_job_handler` call in create_job function. ctx.expect().times(1).with(eq(JobType::DataSubmission)).returning(move |_| Arc::clone(&job_handler)); - assert!(verify_job(job_item.id).await.is_ok()); + assert!(verify_job(job_item.id, services.config.clone()).await.is_ok()); // DB checks. let updated_job = database_client.get_job_by_id(job_item.id).await.unwrap().unwrap(); @@ -490,7 +534,7 @@ async fn verify_job_with_pending_status_works() { // Queue checks. let consumed_messages_verification_queue = - config.queue().consume_message_from_queue(JOB_VERIFICATION_QUEUE.to_string()).await.unwrap_err(); + services.config.queue().consume_message_from_queue(JOB_VERIFICATION_QUEUE.to_string()).await.unwrap_err(); assert_matches!(consumed_messages_verification_queue, QueueError::NoData); } @@ -521,9 +565,13 @@ fn build_job_item_by_type_and_status(job_type: JobType, job_status: JobStatus, i #[case(JobType::DataSubmission, JobStatus::VerificationFailed)] #[tokio::test] async fn handle_job_failure_with_failed_job_status_works(#[case] job_type: JobType, #[case] job_status: JobStatus) { - TestConfigBuilder::new().build().await; - let config = config().await; - let database_client = config.database(); + let services = TestConfigBuilder::new() + .configure_database(ConfigType::Actual) + .configure_queue_client(ConfigType::Actual) + .build() + .await; + + let database_client = services.config.database(); let internal_id = 1; // create a job, with already available "last_job_status" @@ -538,9 +586,10 @@ async fn handle_job_failure_with_failed_job_status_works(#[case] job_type: JobTy database_client.create_job(job_expected.clone()).await.unwrap(); // calling handle_job_failure - handle_job_failure(job_id).await.expect("handle_job_failure failed to run"); + handle_job_failure(job_id, services.config.clone()).await.expect("handle_job_failure failed to run"); - let job_fetched = config.database().get_job_by_id(job_id).await.expect("Unable to fetch Job Data").unwrap(); + let job_fetched = + services.config.database().get_job_by_id(job_id).await.expect("Unable to fetch Job Data").unwrap(); assert_eq!(job_fetched, job_expected); } @@ -550,9 +599,13 @@ async fn handle_job_failure_with_failed_job_status_works(#[case] job_type: JobTy #[case::verification_timeout(JobType::SnosRun, JobStatus::VerificationTimeout)] #[tokio::test] async fn handle_job_failure_with_correct_job_status_works(#[case] job_type: JobType, #[case] job_status: JobStatus) { - TestConfigBuilder::new().build().await; - let config = config().await; - let database_client = config.database(); + let services = TestConfigBuilder::new() + .configure_database(ConfigType::Actual) + .configure_queue_client(ConfigType::Actual) + .build() + .await; + + let database_client = services.config.database(); let internal_id = 1; // create a job @@ -563,9 +616,10 @@ async fn handle_job_failure_with_correct_job_status_works(#[case] job_type: JobT database_client.create_job(job.clone()).await.unwrap(); // calling handle_job_failure - handle_job_failure(job_id).await.expect("handle_job_failure failed to run"); + handle_job_failure(job_id, services.config.clone()).await.expect("handle_job_failure failed to run"); - let job_fetched = config.database().get_job_by_id(job_id).await.expect("Unable to fetch Job Data").unwrap(); + let job_fetched = + services.config.database().get_job_by_id(job_id).await.expect("Unable to fetch Job Data").unwrap(); // creating expected output let mut job_expected = job.clone(); @@ -584,9 +638,13 @@ async fn handle_job_failure_with_correct_job_status_works(#[case] job_type: JobT async fn handle_job_failure_job_status_completed_works(#[case] job_type: JobType) { let job_status = JobStatus::Completed; - TestConfigBuilder::new().build().await; - let config = config().await; - let database_client = config.database(); + let services = TestConfigBuilder::new() + .configure_database(ConfigType::Actual) + .configure_queue_client(ConfigType::Actual) + .build() + .await; + + let database_client = services.config.database(); let internal_id = 1; // create a job @@ -597,10 +655,13 @@ async fn handle_job_failure_job_status_completed_works(#[case] job_type: JobType database_client.create_job(job_expected.clone()).await.unwrap(); // calling handle_job_failure - handle_job_failure(job_id).await.expect("Test call to handle_job_failure should have passed."); + handle_job_failure(job_id, services.config.clone()) + .await + .expect("Test call to handle_job_failure should have passed."); // The completed job status on db is untouched. - let job_fetched = config.database().get_job_by_id(job_id).await.expect("Unable to fetch Job Data").unwrap(); + let job_fetched = + services.config.database().get_job_by_id(job_id).await.expect("Unable to fetch Job Data").unwrap(); assert_eq!(job_fetched, job_expected); } diff --git a/crates/orchestrator/src/tests/jobs/proving_job/mod.rs b/crates/orchestrator/src/tests/jobs/proving_job/mod.rs index 25204d04..27def2e0 100644 --- a/crates/orchestrator/src/tests/jobs/proving_job/mod.rs +++ b/crates/orchestrator/src/tests/jobs/proving_job/mod.rs @@ -4,9 +4,7 @@ use std::collections::HashMap; use std::fs::File; use std::io::Read; use std::path::Path; -use std::sync::Arc; -use crate::config::config; use crate::data_storage::MockDataStorage; use httpmock::prelude::*; use mockall::predicate::eq; @@ -26,10 +24,9 @@ use crate::tests::config::TestConfigBuilder; #[rstest] #[tokio::test] async fn test_create_job() { - TestConfigBuilder::new().build().await; - let config = config().await; + let services = TestConfigBuilder::new().build().await; - let job = ProvingJob.create_job(&config, String::from("0"), HashMap::new()).await; + let job = ProvingJob.create_job(services.config.clone(), String::from("0"), HashMap::new()).await; assert!(job.is_ok()); let job = job.unwrap(); @@ -48,10 +45,9 @@ async fn test_verify_job(#[from(default_job_item)] mut job_item: JobItem) { let mut prover_client = MockProverClient::new(); prover_client.expect_get_task_status().times(1).returning(|_| Ok(TaskStatus::Succeeded)); - TestConfigBuilder::new().mock_prover_client(Box::new(prover_client)).build().await; + let services = TestConfigBuilder::new().configure_prover_client(prover_client.into()).build().await; - let config = config().await; - assert!(ProvingJob.verify_job(&config, &mut job_item).await.is_ok()); + assert!(ProvingJob.verify_job(services.config, &mut job_item).await.is_ok()); } #[rstest] @@ -74,17 +70,17 @@ async fn test_process_job() { let buffer_bytes = Bytes::from(buffer); storage.expect_get_data().with(eq("0/pie.zip")).return_once(move |_| Ok(buffer_bytes)); - TestConfigBuilder::new() - .mock_starknet_client(Arc::new(provider)) - .mock_prover_client(Box::new(prover_client)) - .mock_storage_client(Box::new(storage)) + let services = TestConfigBuilder::new() + .configure_starknet_client(provider.into()) + .configure_prover_client(prover_client.into()) + .configure_storage_client(storage.into()) .build() .await; assert_eq!( ProvingJob .process_job( - config().await.as_ref(), + services.config, &mut JobItem { id: Uuid::default(), internal_id: "0".into(), diff --git a/crates/orchestrator/src/tests/jobs/state_update_job/mod.rs b/crates/orchestrator/src/tests/jobs/state_update_job/mod.rs index 4bdea3c6..ebb27f76 100644 --- a/crates/orchestrator/src/tests/jobs/state_update_job/mod.rs +++ b/crates/orchestrator/src/tests/jobs/state_update_job/mod.rs @@ -1,35 +1,32 @@ use std::collections::HashMap; use std::fs; use std::path::PathBuf; -use std::sync::Arc; use assert_matches::assert_matches; use bytes::Bytes; +use color_eyre::eyre::eyre; use httpmock::prelude::*; +use lazy_static::lazy_static; use mockall::predicate::{always, eq}; use rstest::*; -use settlement_client_interface::MockSettlementClient; +use starknet::providers::jsonrpc::HttpTransport; +use starknet::providers::JsonRpcClient; +use url::Url; -use color_eyre::eyre::eyre; -use utils::env_utils::get_env_var_or_panic; +use settlement_client_interface::MockSettlementClient; -use crate::config::config; use crate::constants::{BLOB_DATA_FILE_NAME, PROGRAM_OUTPUT_FILE_NAME, SNOS_OUTPUT_FILE_NAME}; use crate::data_storage::MockDataStorage; use crate::jobs::constants::{ JOB_METADATA_STATE_UPDATE_BLOCKS_TO_SETTLE_KEY, JOB_METADATA_STATE_UPDATE_FETCH_FROM_TESTS, JOB_METADATA_STATE_UPDATE_LAST_FAILED_BLOCK_NO, JOB_PROCESS_ATTEMPT_METADATA_KEY, }; -use crate::jobs::state_update_job::utils::{bytes_to_vec_u8, fetch_program_data_for_block, hex_string_to_u8_vec}; +use crate::jobs::state_update_job::utils::{bytes_to_vec_u8, hex_string_to_u8_vec}; use crate::jobs::state_update_job::{StateUpdateError, StateUpdateJob}; use crate::jobs::types::{JobStatus, JobType}; use crate::jobs::{Job, JobError}; -use crate::tests::common::{default_job_item, get_storage_client}; +use crate::tests::common::default_job_item; use crate::tests::config::TestConfigBuilder; -use lazy_static::lazy_static; -use starknet::providers::jsonrpc::HttpTransport; -use starknet::providers::JsonRpcClient; -use url::Url; lazy_static! { pub static ref CURRENT_PATH: PathBuf = std::env::current_dir().unwrap(); @@ -42,12 +39,12 @@ pub const X_0_FILE_NAME: &str = "x_0.txt"; #[rstest] #[tokio::test] async fn test_process_job_attempt_not_present_fails() { - TestConfigBuilder::new().build().await; + let services = TestConfigBuilder::new().build().await; let mut job = default_job_item(); - let config = config().await; + let state_update_job = StateUpdateJob {}; - let res = state_update_job.process_job(&config, &mut job).await.unwrap_err(); + let res = state_update_job.process_job(services.config, &mut job).await.unwrap_err(); assert_eq!(res, JobError::StateUpdateJobError(StateUpdateError::AttemptNumberNotFound)); } @@ -62,9 +59,8 @@ async fn test_process_job_works( ) { // Will be used by storage client which we call while storing the data. - use num::ToPrimitive; + use crate::tests::config::ConfigType; - use crate::jobs::state_update_job::utils::fetch_blob_data_for_block; dotenvy::from_filename("../.env.test").expect("Failed to load the .env file"); // Mocking the settlement client. @@ -75,32 +71,77 @@ async fn test_process_job_works( // This must be the last block number and should be returned as an output from the process job. let last_block_number = block_numbers[block_numbers.len() - 1]; - // Storing `blob_data` and `snos_output` in storage client - store_data_in_storage_client_for_s3(block_numbers.clone()).await; + // Adding expectations for each block number to be called by settlement client. + for block in block_numbers.iter().skip(processing_start_index as usize) { + // Getting the blob data from file. + let blob_data = fs::read_to_string( + CURRENT_PATH.join(format!("src/tests/jobs/state_update_job/test_data/{}/{}", block, BLOB_DATA_FILE_NAME)), + ) + .unwrap(); - // Building a temp config that will be used by `fetch_blob_data_for_block` and `fetch_snos_for_block` - // functions while fetching the blob data from storage client. - TestConfigBuilder::new().build().await; + let blob_data_vec = vec![hex_string_to_u8_vec(&blob_data).unwrap()]; + // Getting the program output data from file. + let program_output_data = fs::read_to_string( + CURRENT_PATH + .join(format!("src/tests/jobs/state_update_job/test_data/{}/{}", block, PROGRAM_OUTPUT_FILE_NAME)), + ) + .unwrap(); - // test_process_job_works uses nonce just to write expect_update_state_with_blobs for a mocked settlement client, - // which means that nonce ideally is never checked against, hence supplying any `u64` `nonce` works. - let nonce: u64 = 3; - settlement_client.expect_get_nonce().with().returning(move || Ok(nonce)); + let program_output_bytes = Bytes::from(program_output_data); + let program_output_bytes_ref = program_output_bytes.as_ref(); + + let program_output_data_vec = bytes_to_vec_u8(program_output_bytes_ref); + + println!("Mine {:?}", program_output_data_vec); - // Adding expectations for each block number to be called by settlement client. - for block in block_numbers.iter().skip(processing_start_index as usize) { - let blob_data = fetch_blob_data_for_block(block.to_u64().unwrap()).await.unwrap(); - let program_data = fetch_program_data_for_block(block.to_u64().unwrap()).await.unwrap(); settlement_client .expect_update_state_with_blobs() - .with(eq(program_data), eq(blob_data), always()) + .with(eq(program_output_data_vec), eq(blob_data_vec), always()) .times(1) .returning(|_, _, _| Ok("0xbeef".to_string())); } settlement_client.expect_get_last_settled_block().with().returning(move || Ok(651052)); + // Setting random nonce + settlement_client.expect_get_nonce().with().returning(move || Ok(2)); - // Building new config with mocked settlement client - TestConfigBuilder::new().mock_settlement_client(Box::new(settlement_client)).build().await; + // Building a temp config that will be used by `fetch_blob_data_for_block` and `fetch_snos_for_block` + // functions while fetching the blob data from storage client. + let services = TestConfigBuilder::new() + .configure_storage_client(ConfigType::Actual) + .configure_settlement_client(settlement_client.into()) + .build() + .await; + + let storage_client = services.config.storage(); + + for block in block_numbers { + // Getting the blob data from file. + let blob_data_key = block.to_owned().to_string() + "/" + BLOB_DATA_FILE_NAME; + let blob_data = fs::read_to_string( + CURRENT_PATH.join(format!("src/tests/jobs/state_update_job/test_data/{}/{}", block, BLOB_DATA_FILE_NAME)), + ) + .unwrap(); + let blob_data_vec = hex_string_to_u8_vec(&blob_data).unwrap(); + + // Getting the snos data from file. + let snos_output_key = block.to_owned().to_string() + "/" + SNOS_OUTPUT_FILE_NAME; + let snos_output_data = fs::read_to_string( + CURRENT_PATH.join(format!("src/tests/jobs/state_update_job/test_data/{}/{}", block, SNOS_OUTPUT_FILE_NAME)), + ) + .unwrap(); + + // Getting the program output data from file. + let program_output_key = block.to_owned().to_string() + "/" + PROGRAM_OUTPUT_FILE_NAME; + let program_output_data = fs::read_to_string( + CURRENT_PATH + .join(format!("src/tests/jobs/state_update_job/test_data/{}/{}", block, PROGRAM_OUTPUT_FILE_NAME)), + ) + .unwrap(); + + storage_client.put_data(Bytes::from(snos_output_data), &snos_output_key).await.unwrap(); + storage_client.put_data(Bytes::from(blob_data_vec), &blob_data_key).await.unwrap(); + storage_client.put_data(Bytes::from(program_output_data), &program_output_key).await.unwrap(); + } // setting last failed block number as 651053. // setting blocks yet to process as 651054 and 651055. @@ -117,9 +158,8 @@ async fn test_process_job_works( job.job_type = JobType::StateTransition; job.metadata = metadata; - let config = config().await; let state_update_job = StateUpdateJob {}; - let res = state_update_job.process_job(&config, &mut job).await.unwrap(); + let res = state_update_job.process_job(services.config, &mut job).await.unwrap(); assert_eq!(res, last_block_number.to_string()); } @@ -128,18 +168,16 @@ async fn test_process_job_works( #[rstest] #[tokio::test] async fn create_job_works() { - TestConfigBuilder::new().build().await; + let services = TestConfigBuilder::new().build().await; - let config = config().await; - - let job = StateUpdateJob.create_job(&config, String::from("0"), HashMap::default()).await; + let job = StateUpdateJob.create_job(services.config, String::from("0"), HashMap::default()).await; assert!(job.is_ok()); let job = job.unwrap(); let job_type = job.job_type; assert_eq!(job_type, JobType::StateTransition, "job_type should be StateTransition"); - assert!(!(job.id.is_nil()), "id should not be nil"); + assert!(!job.id.is_nil(), "id should not be nil"); assert_eq!(job.status, JobStatus::Created, "status should be Created"); assert_eq!(job.version, 0_i32, "version should be 0"); assert_eq!(job.external_id.unwrap_string().unwrap(), String::new(), "external_id should be empty string"); @@ -211,9 +249,9 @@ async fn process_job_works_unit_test() { .returning(|_, _, _| Ok(String::from("0x5d17fac98d9454030426606019364f6e68d915b91f6210ef1e2628cd6987442"))); } - TestConfigBuilder::new() - .mock_settlement_client(Box::new(settlement_client)) - .mock_storage_client(Box::new(storage_client)) + let services = TestConfigBuilder::new() + .configure_settlement_client(settlement_client.into()) + .configure_storage_client(storage_client.into()) .build() .await; @@ -223,8 +261,8 @@ async fn process_job_works_unit_test() { metadata.insert(String::from(JOB_PROCESS_ATTEMPT_METADATA_KEY), String::from("0")); let mut job = - StateUpdateJob.create_job(config().await.as_ref(), String::from("internal_id"), metadata).await.unwrap(); - assert_eq!(StateUpdateJob.process_job(config().await.as_ref(), &mut job).await.unwrap(), "651056".to_string()) + StateUpdateJob.create_job(services.config.clone(), String::from("internal_id"), metadata).await.unwrap(); + assert_eq!(StateUpdateJob.process_job(services.config, &mut job).await.unwrap(), "651056".to_string()) } #[rstest] @@ -242,19 +280,19 @@ async fn process_job_invalid_inputs_errors(#[case] block_numbers_to_settle: Stri Url::parse(format!("http://localhost:{}", server.port()).as_str()).expect("Failed to parse URL"), )); - TestConfigBuilder::new() - .mock_starknet_client(Arc::new(provider)) - .mock_settlement_client(Box::new(settlement_client)) + let services = TestConfigBuilder::new() + .configure_settlement_client(settlement_client.into()) + .configure_starknet_client(provider.into()) .build() .await; - let config = config().await; let mut metadata: HashMap = HashMap::new(); metadata.insert(String::from(JOB_METADATA_STATE_UPDATE_BLOCKS_TO_SETTLE_KEY), block_numbers_to_settle); metadata.insert(String::from(JOB_PROCESS_ATTEMPT_METADATA_KEY), String::from("0")); - let mut job = StateUpdateJob.create_job(&config, String::from("internal_id"), metadata).await.unwrap(); - let status = StateUpdateJob.process_job(&config, &mut job).await; + let mut job = + StateUpdateJob.create_job(services.config.clone(), String::from("internal_id"), metadata).await.unwrap(); + let status = StateUpdateJob.process_job(services.config, &mut job).await; assert!(status.is_err()); if let Err(error) = status { @@ -279,9 +317,9 @@ async fn process_job_invalid_input_gap_panics() { Url::parse(format!("http://localhost:{}", server.port()).as_str()).expect("Failed to parse URL"), )); - TestConfigBuilder::new() - .mock_starknet_client(Arc::new(provider)) - .mock_settlement_client(Box::new(settlement_client)) + let services = TestConfigBuilder::new() + .configure_starknet_client(provider.into()) + .configure_settlement_client(settlement_client.into()) .build() .await; @@ -290,8 +328,8 @@ async fn process_job_invalid_input_gap_panics() { metadata.insert(String::from(JOB_PROCESS_ATTEMPT_METADATA_KEY), String::from("0")); let mut job = - StateUpdateJob.create_job(config().await.as_ref(), String::from("internal_id"), metadata).await.unwrap(); - let response = StateUpdateJob.process_job(config().await.as_ref(), &mut job).await; + StateUpdateJob.create_job(services.config.clone(), String::from("internal_id"), metadata).await.unwrap(); + let response = StateUpdateJob.process_job(services.config, &mut job).await; assert_matches!(response, Err(e) => { @@ -310,40 +348,6 @@ async fn load_state_diff_file(block_no: u64) -> Vec { hex_string_to_u8_vec(&file_data).unwrap() } -async fn store_data_in_storage_client_for_s3(block_numbers: Vec) { - let storage_client = get_storage_client().await; - storage_client.build_test_bucket(&get_env_var_or_panic("AWS_S3_BUCKET_NAME")).await.unwrap(); - - for block in block_numbers { - // Getting the blob data from file. - let blob_data_key = block.to_owned().to_string() + "/" + BLOB_DATA_FILE_NAME; - let blob_data = fs::read_to_string( - CURRENT_PATH.join(format!("src/tests/jobs/state_update_job/test_data/{}/{}", block, BLOB_DATA_FILE_NAME)), - ) - .unwrap(); - let blob_data_vec = hex_string_to_u8_vec(&blob_data).unwrap(); - - // Getting the snos data from file. - let snos_output_key = block.to_owned().to_string() + "/" + SNOS_OUTPUT_FILE_NAME; - let snos_output_data = fs::read_to_string( - CURRENT_PATH.join(format!("src/tests/jobs/state_update_job/test_data/{}/{}", block, SNOS_OUTPUT_FILE_NAME)), - ) - .unwrap(); - - // Getting the program output data from file. - let program_output_key = block.to_owned().to_string() + "/" + PROGRAM_OUTPUT_FILE_NAME; - let program_output_data = fs::read_to_string( - CURRENT_PATH - .join(format!("src/tests/jobs/state_update_job/test_data/{}/{}", block, PROGRAM_OUTPUT_FILE_NAME)), - ) - .unwrap(); - - storage_client.put_data(Bytes::from(snos_output_data), &snos_output_key).await.unwrap(); - storage_client.put_data(Bytes::from(blob_data_vec), &blob_data_key).await.unwrap(); - storage_client.put_data(Bytes::from(program_output_data), &program_output_key).await.unwrap(); - } -} - fn parse_block_numbers(blocks_to_settle: &str) -> color_eyre::Result> { let sanitized_blocks = blocks_to_settle.replace(' ', ""); let block_numbers: Vec = sanitized_blocks diff --git a/crates/orchestrator/src/tests/server/mod.rs b/crates/orchestrator/src/tests/server/mod.rs index 33b7f657..670a8933 100644 --- a/crates/orchestrator/src/tests/server/mod.rs +++ b/crates/orchestrator/src/tests/server/mod.rs @@ -1,6 +1,5 @@ use std::io::Read; use std::net::SocketAddr; -use std::sync::Arc; use axum::http::StatusCode; use hyper::body::Buf; @@ -21,7 +20,7 @@ pub async fn setup_server() -> SocketAddr { Url::parse("http://localhost:9944".to_string().as_str()).expect("Failed to parse URL"), )); - TestConfigBuilder::new().mock_starknet_client(Arc::new(provider)).build().await; + TestConfigBuilder::new().configure_starknet_client(provider.into()).build().await; let host = get_env_var_or_default("HOST", "127.0.0.1"); let port = get_env_var_or_default("PORT", "3000").parse::().expect("PORT must be a u16"); @@ -62,5 +61,6 @@ async fn test_health_endpoint(#[future] setup_server: SocketAddr) { #[rstest] #[tokio::test] async fn test_init_consumer() { - assert!(init_consumers().await.is_ok()); + let services = TestConfigBuilder::new().build().await; + assert!(init_consumers(services.config).await.is_ok()); } diff --git a/crates/orchestrator/src/tests/workers/proving/mod.rs b/crates/orchestrator/src/tests/workers/proving/mod.rs index 22f4a36c..05051e54 100644 --- a/crates/orchestrator/src/tests/workers/proving/mod.rs +++ b/crates/orchestrator/src/tests/workers/proving/mod.rs @@ -29,7 +29,7 @@ async fn test_proving_worker(#[case] incomplete_runs: bool) -> Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box> { Url::parse(format!("http://localhost:{}", server.port()).as_str()).expect("Failed to parse URL"), )); - TestConfigBuilder::new() - .mock_starknet_client(Arc::new(provider)) - .mock_db_client(Box::new(db)) - .mock_queue(Box::new(queue)) - .mock_da_client(Box::new(da_client)) + let services = TestConfigBuilder::new() + .configure_starknet_client(provider.into()) + .configure_database(db.into()) + .configure_queue_client(queue.into()) + .configure_da_client(da_client.into()) .build() .await; @@ -110,7 +110,7 @@ async fn test_snos_worker(#[case] db_val: bool) -> Result<(), Box> { }); let snos_worker = SnosWorker {}; - snos_worker.run_worker().await?; + snos_worker.run_worker(services.config).await?; rpc_block_call_mock.assert(); diff --git a/crates/orchestrator/src/tests/workers/update_state/mod.rs b/crates/orchestrator/src/tests/workers/update_state/mod.rs index 98f979a0..355f5fe5 100644 --- a/crates/orchestrator/src/tests/workers/update_state/mod.rs +++ b/crates/orchestrator/src/tests/workers/update_state/mod.rs @@ -74,12 +74,10 @@ async fn test_update_state_worker( // creation) let completed_jobs = get_job_by_mock_id_vector(JobType::ProofCreation, JobStatus::Completed, number_of_processed_jobs as u64, 2); - for job in completed_jobs { - db.expect_get_job_by_internal_id_and_type() - .times(1) - .with(eq(job.internal_id.to_string()), eq(JobType::StateTransition)) - .returning(|_, _| Ok(None)); - } + db.expect_get_job_by_internal_id_and_type() + .times(1) + .with(eq(completed_jobs[0].internal_id.to_string()), eq(JobType::StateTransition)) + .returning(|_, _| Ok(None)); // mocking the creation of jobs let job_item = get_job_item_mock_by_id("1".to_string(), Uuid::new_v4()); @@ -111,16 +109,16 @@ async fn test_update_state_worker( )); // mock block number (madara) : 5 - TestConfigBuilder::new() - .mock_starknet_client(Arc::new(provider)) - .mock_db_client(Box::new(db)) - .mock_queue(Box::new(queue)) - .mock_da_client(Box::new(da_client)) + let services = TestConfigBuilder::new() + .configure_starknet_client(provider.into()) + .configure_database(db.into()) + .configure_queue_client(queue.into()) + .configure_da_client(da_client.into()) .build() .await; let update_state_worker = UpdateStateWorker {}; - update_state_worker.run_worker().await?; + update_state_worker.run_worker(services.config).await?; Ok(()) } diff --git a/crates/orchestrator/src/workers/data_submission_worker.rs b/crates/orchestrator/src/workers/data_submission_worker.rs index 3c2d4331..23e43770 100644 --- a/crates/orchestrator/src/workers/data_submission_worker.rs +++ b/crates/orchestrator/src/workers/data_submission_worker.rs @@ -1,10 +1,11 @@ -use crate::config::config; +use crate::config::Config; use crate::jobs::create_job; use crate::jobs::types::{JobStatus, JobType}; use crate::workers::Worker; use async_trait::async_trait; use std::collections::HashMap; use std::error::Error; +use std::sync::Arc; pub struct DataSubmissionWorker; @@ -14,9 +15,7 @@ impl Worker for DataSubmissionWorker { // 1. Fetch the latest completed Proving job. // 2. Fetch the latest DA job creation. // 3. Create jobs from after the lastest DA job already created till latest completed proving job. - async fn run_worker(&self) -> Result<(), Box> { - let config = config().await; - + async fn run_worker(&self, config: Arc) -> Result<(), Box> { // provides latest completed proof creation job id let latest_proven_job_id = config .database() @@ -40,7 +39,7 @@ impl Worker for DataSubmissionWorker { // creating data submission jobs for latest blocks that don't have existing data submission jobs yet. for new_job_id in latest_data_submission_id + 1..latest_proven_id + 1 { - create_job(JobType::DataSubmission, new_job_id.to_string(), HashMap::new()).await?; + create_job(JobType::DataSubmission, new_job_id.to_string(), HashMap::new(), config.clone()).await?; } Ok(()) diff --git a/crates/orchestrator/src/workers/mod.rs b/crates/orchestrator/src/workers/mod.rs index 7b4a83c9..cc3c53e7 100644 --- a/crates/orchestrator/src/workers/mod.rs +++ b/crates/orchestrator/src/workers/mod.rs @@ -1,6 +1,6 @@ -use crate::{config::config, jobs::types::JobStatus}; +use crate::{config::Config, jobs::types::JobStatus}; use async_trait::async_trait; -use std::error::Error; +use std::{error::Error, sync::Arc}; use thiserror::Error; pub mod data_submission_worker; @@ -23,14 +23,14 @@ pub enum WorkerError { #[async_trait] pub trait Worker: Send + Sync { - async fn run_worker_if_enabled(&self) -> Result<(), Box> { - if !self.is_worker_enabled().await? { + async fn run_worker_if_enabled(&self, config: Arc) -> Result<(), Box> { + if !self.is_worker_enabled(config.clone()).await? { return Ok(()); } - self.run_worker().await + self.run_worker(config).await } - async fn run_worker(&self) -> Result<(), Box>; + async fn run_worker(&self, config: Arc) -> Result<(), Box>; // Assumption // If say a job for block X fails, we don't want the worker to respawn another job for the same block @@ -42,9 +42,7 @@ pub trait Worker: Send + Sync { // Checks if any of the jobs have failed // Failure : JobStatus::VerificationFailed, JobStatus::VerificationTimeout, JobStatus::Failed // Halts any new job creation till all the count of failed jobs is not Zero. - async fn is_worker_enabled(&self) -> Result> { - let config = config().await; - + async fn is_worker_enabled(&self, config: Arc) -> Result> { let failed_jobs = config .database() .get_jobs_by_statuses(vec![JobStatus::VerificationFailed, JobStatus::VerificationTimeout], Some(1)) diff --git a/crates/orchestrator/src/workers/proof_registration.rs b/crates/orchestrator/src/workers/proof_registration.rs index ea6e5544..abf6da53 100644 --- a/crates/orchestrator/src/workers/proof_registration.rs +++ b/crates/orchestrator/src/workers/proof_registration.rs @@ -1,8 +1,8 @@ -use std::error::Error; +use std::{error::Error, sync::Arc}; use async_trait::async_trait; -use crate::workers::Worker; +use crate::{config::Config, workers::Worker}; pub struct ProofRegistrationWorker; @@ -11,7 +11,7 @@ impl Worker for ProofRegistrationWorker { /// 1. Fetch all blocks with a successful proving job run /// 2. Group blocks that have the same proof /// 3. For each group, create a proof registration job with from and to block in metadata - async fn run_worker(&self) -> Result<(), Box> { + async fn run_worker(&self, _config: Arc) -> Result<(), Box> { todo!() } } diff --git a/crates/orchestrator/src/workers/proving.rs b/crates/orchestrator/src/workers/proving.rs index 4ec85b91..e263b988 100644 --- a/crates/orchestrator/src/workers/proving.rs +++ b/crates/orchestrator/src/workers/proving.rs @@ -1,9 +1,10 @@ -use crate::config::config; +use crate::config::Config; use crate::jobs::create_job; use crate::jobs::types::{JobStatus, JobType}; use crate::workers::Worker; use async_trait::async_trait; use std::error::Error; +use std::sync::Arc; pub struct ProvingWorker; @@ -11,15 +12,14 @@ pub struct ProvingWorker; impl Worker for ProvingWorker { /// 1. Fetch all successful SNOS job runs that don't have a proving job /// 2. Create a proving job for each SNOS job run - async fn run_worker(&self) -> Result<(), Box> { - let config = config().await; + async fn run_worker(&self, config: Arc) -> Result<(), Box> { let successful_snos_jobs = config .database() .get_jobs_without_successor(JobType::SnosRun, JobStatus::Completed, JobType::ProofCreation) .await?; for job in successful_snos_jobs { - create_job(JobType::ProofCreation, job.internal_id.to_string(), job.metadata).await? + create_job(JobType::ProofCreation, job.internal_id.to_string(), job.metadata, config.clone()).await? } Ok(()) diff --git a/crates/orchestrator/src/workers/snos.rs b/crates/orchestrator/src/workers/snos.rs index 7508263a..69529a77 100644 --- a/crates/orchestrator/src/workers/snos.rs +++ b/crates/orchestrator/src/workers/snos.rs @@ -1,10 +1,11 @@ use std::collections::HashMap; use std::error::Error; +use std::sync::Arc; use async_trait::async_trait; use starknet::providers::Provider; -use crate::config::config; +use crate::config::Config; use crate::jobs::create_job; use crate::jobs::types::{JobStatus, JobType}; use crate::workers::Worker; @@ -16,8 +17,7 @@ impl Worker for SnosWorker { /// 1. Fetch the latest completed block from the Starknet chain /// 2. Fetch the last block that had a SNOS job run. /// 3. Create SNOS run jobs for all the remaining blocks - async fn run_worker(&self) -> Result<(), Box> { - let config = config().await; + async fn run_worker(&self, config: Arc) -> Result<(), Box> { let provider = config.starknet_client(); let latest_block_number = provider.block_number().await?; let latest_block_processed_data = config @@ -38,7 +38,7 @@ impl Worker for SnosWorker { } for x in latest_block_processed + 1..latest_block_number + 1 { - create_job(JobType::SnosRun, x.to_string(), HashMap::new()).await?; + create_job(JobType::SnosRun, x.to_string(), HashMap::new(), config.clone()).await?; } Ok(()) diff --git a/crates/orchestrator/src/workers/update_state.rs b/crates/orchestrator/src/workers/update_state.rs index 78753be2..4c99e2c3 100644 --- a/crates/orchestrator/src/workers/update_state.rs +++ b/crates/orchestrator/src/workers/update_state.rs @@ -1,8 +1,9 @@ use std::error::Error; +use std::sync::Arc; use async_trait::async_trait; -use crate::config::config; +use crate::config::Config; use crate::jobs::constants::JOB_METADATA_STATE_UPDATE_BLOCKS_TO_SETTLE_KEY; use crate::jobs::create_job; use crate::jobs::types::{JobItem, JobStatus, JobType}; @@ -15,8 +16,7 @@ impl Worker for UpdateStateWorker { /// 1. Fetch the last successful state update job /// 2. Fetch all successful proving jobs covering blocks after the last state update /// 3. Create state updates for all the blocks that don't have a state update job - async fn run_worker(&self) -> Result<(), Box> { - let config = config().await; + async fn run_worker(&self, config: Arc) -> Result<(), Box> { let latest_successful_job = config.database().get_latest_job_by_type_and_status(JobType::StateTransition, JobStatus::Completed).await?; @@ -42,6 +42,7 @@ impl Worker for UpdateStateWorker { JobType::StateTransition, successful_da_jobs_without_successor[0].internal_id.clone(), metadata, + config, ) .await?; @@ -66,7 +67,7 @@ impl Worker for UpdateStateWorker { Self::parse_job_items_into_block_number_list(latest_successful_jobs_without_successor.clone()), ); - create_job(JobType::StateTransition, job.internal_id, metadata).await?; + create_job(JobType::StateTransition, job.internal_id, metadata, config).await?; return Ok(()); } diff --git a/crates/prover-services/sharp-service/src/config.rs b/crates/prover-services/sharp-service/src/config.rs index 06b29c78..2e17b806 100644 --- a/crates/prover-services/sharp-service/src/config.rs +++ b/crates/prover-services/sharp-service/src/config.rs @@ -17,9 +17,9 @@ pub struct SharpConfig { impl SharpConfig { pub fn new_with_settings(settings: &impl Settings) -> color_eyre::Result { Ok(Self { - service_url: settings.get_settings("SHARP_URL")?.parse().unwrap(), - rpc_node_url: settings.get_settings("SETTLEMENT_RPC_URL")?.parse().unwrap(), - verifier_address: settings.get_settings("MEMORY_PAGES_CONTRACT_ADDRESS")?.parse().unwrap(), + service_url: settings.get_settings_or_panic("SHARP_URL").parse().unwrap(), + rpc_node_url: settings.get_settings_or_panic("SETTLEMENT_RPC_URL").parse().unwrap(), + verifier_address: settings.get_settings_or_panic("MEMORY_PAGES_CONTRACT_ADDRESS").parse().unwrap(), }) } } diff --git a/crates/settlement-clients/ethereum/src/config.rs b/crates/settlement-clients/ethereum/src/config.rs index ae7c0a31..89b0dcd5 100644 --- a/crates/settlement-clients/ethereum/src/config.rs +++ b/crates/settlement-clients/ethereum/src/config.rs @@ -17,13 +17,9 @@ pub struct EthereumSettlementConfig { impl SettlementConfig for EthereumSettlementConfig { fn new_with_settings(settings: &impl Settings) -> Self { - let rpc_url = settings - .get_settings(SETTLEMENT_RPC_URL) - .expect("Not able to get SETTLEMENT_RPC_URL from settings provided"); + let rpc_url = settings.get_settings_or_panic(SETTLEMENT_RPC_URL); let rpc_url = Url::from_str(&rpc_url).unwrap_or_else(|_| panic!("Failed to parse {}", SETTLEMENT_RPC_URL)); - let core_contract_address = settings - .get_settings(ENV_CORE_CONTRACT_ADDRESS) - .expect("Not able to get ENV_CORE_CONTRACT_ADDRESS from settings provided"); + let core_contract_address = settings.get_settings_or_panic(ENV_CORE_CONTRACT_ADDRESS); Self { rpc_url, core_contract_address } } } diff --git a/crates/settlement-clients/starknet/src/config.rs b/crates/settlement-clients/starknet/src/config.rs index 7d4e662e..3f3eb146 100644 --- a/crates/settlement-clients/starknet/src/config.rs +++ b/crates/settlement-clients/starknet/src/config.rs @@ -22,13 +22,9 @@ pub struct StarknetSettlementConfig { impl SettlementConfig for StarknetSettlementConfig { /// Should create a new instance of the DaConfig from the environment variables fn new_with_settings(settings: &impl Settings) -> Self { - let rpc_url = settings - .get_settings(ENV_STARKNET_RPC_URL) - .expect("Not able to get ENV_STARKNET_RPC_URL from settings provided"); + let rpc_url = settings.get_settings_or_panic(ENV_STARKNET_RPC_URL); let rpc_url = Url::from_str(&rpc_url).unwrap_or_else(|_| panic!("Failed to parse {}", ENV_STARKNET_RPC_URL)); - let core_contract_address = settings - .get_settings(ENV_CORE_CONTRACT_ADDRESS) - .expect("Not able to get ENV_CORE_CONTRACT_ADDRESS from settings provided"); + let core_contract_address = settings.get_settings_or_panic(ENV_CORE_CONTRACT_ADDRESS); let tx_finality_retry_delay_in_seconds: u64 = get_env_var_or_default(ENV_STARKNET_FINALITY_RETRY_DELAY_IN_SECS, DEFAULT_FINALITY_RETRY_DELAY) .parse() diff --git a/crates/settlement-clients/starknet/src/lib.rs b/crates/settlement-clients/starknet/src/lib.rs index 4ae098a0..8d2f5041 100644 --- a/crates/settlement-clients/starknet/src/lib.rs +++ b/crates/settlement-clients/starknet/src/lib.rs @@ -47,13 +47,11 @@ impl StarknetSettlementClient { let settlement_cfg = StarknetSettlementConfig::new_with_settings(settings); let provider = Arc::new(JsonRpcClient::new(HttpTransport::new(settlement_cfg.rpc_url))); - let public_key = - settings.get_settings(ENV_PUBLIC_KEY).expect("Not able to get ENV_PUBLIC_KEY from given settings."); + let public_key = settings.get_settings_or_panic(ENV_PUBLIC_KEY); let signer_address = FieldElement::from_hex_be(&public_key).expect("invalid signer address"); // TODO: Very insecure way of building the signer. Needs to be adjusted. - let private_key = - settings.get_settings(ENV_PRIVATE_KEY).expect("Not able to get ENV_PRIVATE_KEY from given settings."); + let private_key = settings.get_settings_or_panic(ENV_PRIVATE_KEY); let signer = FieldElement::from_hex_be(&private_key).expect("Invalid private key"); let signer = LocalWallet::from(SigningKey::from_secret_scalar(signer)); diff --git a/crates/utils/src/settings/env.rs b/crates/utils/src/settings/env.rs index 8cc4ecd2..6a5962f0 100644 --- a/crates/utils/src/settings/env.rs +++ b/crates/utils/src/settings/env.rs @@ -1,11 +1,11 @@ use crate::env_utils::get_env_var_or_panic; -use crate::settings::{Settings, SettingsProviderError}; +use crate::settings::Settings; #[derive(Debug, Clone, Default)] pub struct EnvSettingsProvider {} impl Settings for EnvSettingsProvider { - fn get_settings(&self, name: &'static str) -> Result { - Ok(get_env_var_or_panic(name)) + fn get_settings_or_panic(&self, name: &'static str) -> String { + get_env_var_or_panic(name) } } diff --git a/crates/utils/src/settings/mod.rs b/crates/utils/src/settings/mod.rs index ede9511c..05b0d787 100644 --- a/crates/utils/src/settings/mod.rs +++ b/crates/utils/src/settings/mod.rs @@ -7,5 +7,5 @@ pub enum SettingsProviderError { } pub trait Settings { - fn get_settings(&self, name: &'static str) -> Result; + fn get_settings_or_panic(&self, name: &'static str) -> String; } diff --git a/e2e-tests/src/localstack.rs b/e2e-tests/src/localstack.rs index b211235e..f5b89260 100644 --- a/e2e-tests/src/localstack.rs +++ b/e2e-tests/src/localstack.rs @@ -25,12 +25,11 @@ impl LocalStack { pub async fn new() -> Self { let region_provider = Region::new(get_env_var_or_panic("AWS_REGION")); let config = aws_config::from_env().region(region_provider).load().await; + let provider_config = Arc::new(ProviderConfig::AWS(Box::from(config.clone()))); Self { sqs_client: aws_sdk_sqs::Client::new(&config), - s3_client: Box::new( - AWSS3::new_with_settings(&EnvSettingsProvider {}, ProviderConfig::AWS(Arc::from(config.clone()))).await, - ), + s3_client: Box::new(AWSS3::new_with_settings(&EnvSettingsProvider {}, provider_config).await), event_bridge_client: aws_sdk_eventbridge::Client::new(&config), } }