diff --git a/.env.test b/.env.test index 4d09e45b..c0aa63e4 100644 --- a/.env.test +++ b/.env.test @@ -26,7 +26,6 @@ SQS_WORKER_TRIGGER_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:45 ##### SNS ##### ALERTS="sns" -AWS_SNS_REGION="us-east-1" AWS_SNS_ARN="arn:aws:sns:us-east-1:000000000000:madara-orchestrator-arn" AWS_SNS_ARN_NAME="madara-orchestrator-arn" diff --git a/CHANGELOG.md b/CHANGELOG.md index c79ba808..ba3c7300 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ## Changed +- AWS config built from TestConfigBuilder. +- Better TestConfigBuilder, with sync config clients. +- Drilled Config, removing dirty global reads. - refactor AWS config usage and clean .env files - GitHub's coverage CI yml file for localstack and db testing. - Orchestrator :Moved TestConfigBuilder to `config.rs` in tests folder. diff --git a/Cargo.lock b/Cargo.lock index ccbd7a75..0911cf96 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1073,12 +1073,6 @@ version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da" -[[package]] -name = "arc-swap" -version = "1.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" - [[package]] name = "ark-ec" version = "0.4.2" @@ -6392,7 +6386,6 @@ name = "orchestrator" version = "0.1.0" dependencies = [ "alloy 0.2.1", - "arc-swap", "assert_matches", "async-std", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 14d3212a..272aa48a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,7 +50,6 @@ url = { version = "2.5.0", features = ["serde"] } uuid = { version = "1.7.0", features = ["v4", "serde"] } httpmock = { version = "0.7.0", features = ["remote"] } num-bigint = { version = "0.4.4" } -arc-swap = { version = "1.7.1" } num-traits = "0.2" lazy_static = "1.4.0" stark_evm_adapter = "0.1.1" diff --git a/crates/orchestrator/Cargo.toml b/crates/orchestrator/Cargo.toml index 1d67b237..f824e6ef 100644 --- a/crates/orchestrator/Cargo.toml +++ b/crates/orchestrator/Cargo.toml @@ -13,7 +13,6 @@ path = "src/main.rs" [dependencies] alloy = { workspace = true } -arc-swap = { workspace = true } assert_matches = "1.5.0" async-std = "1.12.0" async-trait = { workspace = true } diff --git a/crates/orchestrator/src/alerts/aws_sns/mod.rs b/crates/orchestrator/src/alerts/aws_sns/mod.rs index 9aa95f9a..0d885ffb 100644 --- a/crates/orchestrator/src/alerts/aws_sns/mod.rs +++ b/crates/orchestrator/src/alerts/aws_sns/mod.rs @@ -1,6 +1,6 @@ use crate::alerts::Alerts; use async_trait::async_trait; -use aws_sdk_sns::config::Region; +use aws_config::SdkConfig; use aws_sdk_sns::Client; use utils::env_utils::get_env_var_or_panic; @@ -10,10 +10,8 @@ pub struct AWSSNS { impl AWSSNS { /// To create a new SNS client - pub async fn new() -> Self { - let sns_region = get_env_var_or_panic("AWS_SNS_REGION"); - let config = aws_config::from_env().region(Region::new(sns_region)).load().await; - AWSSNS { client: Client::new(&config) } + pub async fn new(config: &SdkConfig) -> Self { + AWSSNS { client: Client::new(config) } } } diff --git a/crates/orchestrator/src/config.rs b/crates/orchestrator/src/config.rs index 4f29cac5..213a24ef 100644 --- a/crates/orchestrator/src/config.rs +++ b/crates/orchestrator/src/config.rs @@ -1,27 +1,26 @@ use std::sync::Arc; -use crate::alerts::aws_sns::AWSSNS; -use crate::alerts::Alerts; -use crate::data_storage::aws_s3::config::AWSS3Config; -use crate::data_storage::aws_s3::AWSS3; -use crate::data_storage::{DataStorage, DataStorageConfig}; -use arc_swap::{ArcSwap, Guard}; use aws_config::SdkConfig; -use da_client_interface::{DaClient, DaConfig}; use dotenvy::dotenv; +use starknet::providers::jsonrpc::HttpTransport; +use starknet::providers::{JsonRpcClient, Url}; + +use da_client_interface::{DaClient, DaConfig}; use ethereum_da_client::config::EthereumDaConfig; use ethereum_settlement_client::EthereumSettlementClient; use prover_client_interface::ProverClient; use settlement_client_interface::SettlementClient; use sharp_service::SharpProverService; -use starknet::providers::jsonrpc::HttpTransport; -use starknet::providers::{JsonRpcClient, Url}; use starknet_settlement_client::StarknetSettlementClient; -use tokio::sync::OnceCell; use utils::env_utils::get_env_var_or_panic; use utils::settings::default::DefaultSettingsProvider; use utils::settings::SettingsProvider; +use crate::alerts::aws_sns::AWSSNS; +use crate::alerts::Alerts; +use crate::data_storage::aws_s3::config::AWSS3Config; +use crate::data_storage::aws_s3::AWSS3; +use crate::data_storage::{DataStorage, DataStorageConfig}; use crate::database::mongodb::config::MongoDbConfig; use crate::database::mongodb::MongoDb; use crate::database::{Database, DatabaseConfig}; @@ -50,7 +49,7 @@ pub struct Config { } /// Initializes the app config -pub async fn init_config() -> Config { +pub async fn init_config() -> Arc { dotenv().ok(); // init starknet client @@ -68,7 +67,7 @@ pub async fn init_config() -> Config { // TODO: we use omniqueue for now which doesn't support loading AWS config // from `SdkConfig`. We can later move to using `aws_sdk_sqs`. This would require // us stop using the generic omniqueue abstractions for message ack/nack - let queue = build_queue_client(&aws_config); + let queue = build_queue_client(); let da_client = build_da_client().await; @@ -77,10 +76,9 @@ pub async fn init_config() -> Config { let prover_client = build_prover_service(&settings_provider); let storage_client = build_storage_client(&aws_config).await; + let alerts_client = build_alert_client(&aws_config).await; - let alerts_client = build_alert_client().await; - - Config::new( + Arc::new(Config::new( Arc::new(provider), da_client, prover_client, @@ -89,7 +87,7 @@ pub async fn init_config() -> Config { queue, storage_client, alerts_client, - ) + )) } impl Config { @@ -149,33 +147,6 @@ impl Config { } } -/// The app config. It can be accessed from anywhere inside the service. -/// It's initialized only once. -/// We are using `ArcSwap` as it allow us to replace the new `Config` with -/// a new one which is required when running test cases. This approach was -/// inspired from here - https://github.com/matklad/once_cell/issues/127 -pub static CONFIG: OnceCell> = OnceCell::const_new(); - -/// Returns the app config. Initializes if not already done. -pub async fn config() -> Guard> { - let cfg = CONFIG.get_or_init(|| async { ArcSwap::from_pointee(init_config().await) }).await; - cfg.load() -} - -/// OnceCell only allows us to initialize the config once and that's how it should be on production. -/// However, when running tests, we often want to reinitialize because we want to clear the DB and -/// set it up again for reuse in new tests. By calling `config_force_init` we replace the already -/// stored config inside `ArcSwap` with the new configuration and pool settings. -#[cfg(test)] -pub async fn config_force_init(config: Config) { - match CONFIG.get() { - Some(arc) => arc.store(Arc::new(config)), - None => { - CONFIG.get_or_init(|| async { ArcSwap::from_pointee(config) }).await; - } - } -} - /// Builds the DA client based on the environment variable DA_LAYER pub async fn build_da_client() -> Box { match get_env_var_or_panic("DA_LAYER").as_str() { @@ -213,13 +184,13 @@ pub async fn build_storage_client(aws_config: &SdkConfig) -> Box Box { +pub async fn build_alert_client(aws_config: &SdkConfig) -> Box { match get_env_var_or_panic("ALERTS").as_str() { - "sns" => Box::new(AWSSNS::new().await), + "sns" => Box::new(AWSSNS::new(aws_config).await), _ => panic!("Unsupported Alert Client"), } } -pub fn build_queue_client(_aws_config: &SdkConfig) -> Box { +pub fn build_queue_client() -> Box { match get_env_var_or_panic("QUEUE_PROVIDER").as_str() { "sqs" => Box::new(SqsQueue {}), _ => panic!("Unsupported Queue Client"), diff --git a/crates/orchestrator/src/jobs/da_job/mod.rs b/crates/orchestrator/src/jobs/da_job/mod.rs index 8148b113..2aa177ef 100644 --- a/crates/orchestrator/src/jobs/da_job/mod.rs +++ b/crates/orchestrator/src/jobs/da_job/mod.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use std::ops::{Add, Mul, Rem}; use std::str::FromStr; +use std::sync::Arc; use async_trait::async_trait; use color_eyre::eyre::WrapErr; @@ -13,11 +14,12 @@ use thiserror::Error; use tracing::log; use uuid::Uuid; -use super::types::{JobItem, JobStatus, JobType, JobVerificationStatus}; -use super::{Job, JobError, OtherError}; use crate::config::Config; use crate::constants::BLOB_DATA_FILE_NAME; +use super::types::{JobItem, JobStatus, JobType, JobVerificationStatus}; +use super::{Job, JobError, OtherError}; + lazy_static! { /// EIP-4844 BLS12-381 modulus. /// @@ -59,7 +61,7 @@ pub struct DaJob; impl Job for DaJob { async fn create_job( &self, - _config: &Config, + _config: Arc, internal_id: String, metadata: HashMap, ) -> Result { @@ -74,7 +76,7 @@ impl Job for DaJob { }) } - async fn process_job(&self, config: &Config, job: &mut JobItem) -> Result { + async fn process_job(&self, config: Arc, job: &mut JobItem) -> Result { let block_no = job .internal_id .parse::() @@ -95,7 +97,7 @@ impl Job for DaJob { MaybePendingStateUpdate::Update(state_update) => state_update, }; // constructing the data from the rpc - let blob_data = state_update_to_blob_data(block_no, state_update, config) + let blob_data = state_update_to_blob_data(block_no, state_update, config.clone()) .await .map_err(|e| JobError::Other(OtherError(e)))?; // transforming the data so that we can apply FFT on this. @@ -135,7 +137,7 @@ impl Job for DaJob { Ok(external_id) } - async fn verify_job(&self, config: &Config, job: &mut JobItem) -> Result { + async fn verify_job(&self, config: Arc, job: &mut JobItem) -> Result { Ok(config .da_client() .verify_inclusion(job.external_id.unwrap_string().map_err(|e| JobError::Other(OtherError(e)))?) @@ -230,7 +232,7 @@ fn data_to_blobs(blob_size: u64, block_data: Vec) -> Result pub async fn state_update_to_blob_data( block_no: u64, state_update: StateUpdate, - config: &Config, + config: Arc, ) -> color_eyre::Result> { let state_diff = state_update.state_diff; let mut blob_data: Vec = vec![ @@ -308,7 +310,7 @@ pub async fn state_update_to_blob_data( } /// To store the blob data using the storage client with path /blob_data.txt -async fn store_blob_data(blob_data: Vec, block_number: u64, config: &Config) -> Result<(), JobError> { +async fn store_blob_data(blob_data: Vec, block_number: u64, config: Arc) -> Result<(), JobError> { let storage_client = config.storage(); let key = block_number.to_string() + "/" + BLOB_DATA_FILE_NAME; let data_blob_big_uint = convert_to_biguint(blob_data.clone()); @@ -370,18 +372,12 @@ fn da_word(class_flag: bool, nonce_change: Option, num_changes: u6 #[cfg(test)] pub mod test { - use crate::jobs::da_job::da_word; use std::fs; use std::fs::File; use std::io::Read; - use std::sync::Arc; - use crate::config::config; - use crate::data_storage::MockDataStorage; - use crate::tests::config::TestConfigBuilder; use ::serde::{Deserialize, Serialize}; use color_eyre::Result; - use da_client_interface::MockDaClient; use httpmock::prelude::*; use majin_blob_core::blob; use majin_blob_types::serde; @@ -393,6 +389,11 @@ pub mod test { use starknet_core::types::{FieldElement, StateUpdate}; use url::Url; + use da_client_interface::MockDaClient; + + use crate::data_storage::MockDataStorage; + use crate::jobs::da_job::da_word; + /// Tests `da_word` function with various inputs for class flag, new nonce, and number of changes. /// Verifies that `da_word` produces the correct FieldElement based on the provided parameters. /// Uses test cases with different combinations of inputs and expected output strings. @@ -444,7 +445,10 @@ pub mod test { #[case] file_path: &str, #[case] nonce_file_path: &str, ) { - use crate::jobs::da_job::{convert_to_biguint, state_update_to_blob_data}; + use crate::{ + jobs::da_job::{convert_to_biguint, state_update_to_blob_data}, + tests::config::TestConfigBuilder, + }; let server = MockServer::start(); let mut da_client = MockDaClient::new(); @@ -462,19 +466,17 @@ pub mod test { )); // mock block number (madara) : 5 - TestConfigBuilder::new() - .mock_starknet_client(Arc::new(provider)) - .mock_da_client(Box::new(da_client)) - .mock_storage_client(Box::new(storage_client)) + let services = TestConfigBuilder::new() + .configure_starknet_client(provider.into()) + .configure_da_client(da_client.into()) + .configure_storage_client(storage_client.into()) .build() .await; - let config = config().await; - get_nonce_attached(&server, nonce_file_path); let state_update = read_state_update_from_file(state_update_file_path).expect("issue while reading"); - let blob_data = state_update_to_blob_data(block_no, state_update, &config) + let blob_data = state_update_to_blob_data(block_no, state_update, services.config) .await .expect("issue while converting state update to blob data"); diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index 11da33d8..1508913d 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -1,8 +1,9 @@ use std::collections::HashMap; use std::fmt; +use std::sync::Arc; use std::time::Duration; -use crate::config::{config, Config}; +use crate::config::Config; use crate::jobs::constants::{JOB_PROCESS_ATTEMPT_METADATA_KEY, JOB_VERIFICATION_ATTEMPT_METADATA_KEY}; use crate::jobs::types::{JobItem, JobStatus, JobType, JobVerificationStatus}; use crate::queue::job_queue::{add_job_to_process_queue, add_job_to_verification_queue, ConsumptionError}; @@ -100,18 +101,18 @@ pub trait Job: Send + Sync { /// Should build a new job item and return it async fn create_job( &self, - config: &Config, + config: Arc, internal_id: String, metadata: HashMap, ) -> Result; /// Should process the job and return the external_id which can be used to /// track the status of the job. For example, a DA job will submit the state diff /// to the DA layer and return the txn hash. - async fn process_job(&self, config: &Config, job: &mut JobItem) -> Result; + async fn process_job(&self, config: Arc, job: &mut JobItem) -> Result; /// Should verify the job and return the status of the verification. For example, /// a DA job will verify the inclusion of the state diff in the DA layer and return /// the status of the verification. - async fn verify_job(&self, config: &Config, job: &mut JobItem) -> Result; + async fn verify_job(&self, config: Arc, job: &mut JobItem) -> Result; /// Should return the maximum number of attempts to process the job. A new attempt is made /// every time the verification returns `JobVerificationStatus::Rejected` fn max_process_attempts(&self) -> u64; @@ -127,8 +128,8 @@ pub async fn create_job( job_type: JobType, internal_id: String, metadata: HashMap, + config: Arc, ) -> Result<(), JobError> { - let config = config().await; let existing_job = config .database() .get_job_by_internal_id_and_type(internal_id.as_str(), &job_type) @@ -139,19 +140,17 @@ pub async fn create_job( } let job_handler = factory::get_job_handler(&job_type).await; - let job_item = job_handler.create_job(config.as_ref(), internal_id.clone(), metadata.clone()).await?; - + let job_item = job_handler.create_job(config.clone(), internal_id, metadata).await?; config.database().create_job(job_item.clone()).await.map_err(|e| JobError::Other(OtherError(e)))?; - add_job_to_process_queue(job_item.id).await.map_err(|e| JobError::Other(OtherError(e)))?; + add_job_to_process_queue(job_item.id, config.clone()).await.map_err(|e| JobError::Other(OtherError(e)))?; Ok(()) } /// Processes the job, increments the process attempt count and updates the status of the job in the /// DB. It then adds the job to the verification queue. -pub async fn process_job(id: Uuid) -> Result<(), JobError> { - let config = config().await; - let mut job = get_job(id).await?; +pub async fn process_job(id: Uuid, config: Arc) -> Result<(), JobError> { + let mut job = get_job(id, config.clone()).await?; match job.status { // we only want to process jobs that are in the created or verification failed state. @@ -173,12 +172,11 @@ pub async fn process_job(id: Uuid) -> Result<(), JobError> { .map_err(|e| JobError::Other(OtherError(e)))?; let job_handler = factory::get_job_handler(&job.job_type).await; - let external_id = job_handler.process_job(config.as_ref(), &mut job).await?; - + let external_id = job_handler.process_job(config.clone(), &mut job).await?; let metadata = increment_key_in_metadata(&job.metadata, JOB_PROCESS_ATTEMPT_METADATA_KEY)?; // Fetching the job again because update status above will update the job version - let mut job_updated = get_job(id).await?; + let mut job_updated = get_job(id, config.clone()).await?; job_updated.external_id = external_id.into(); job_updated.status = JobStatus::PendingVerification; @@ -186,9 +184,13 @@ pub async fn process_job(id: Uuid) -> Result<(), JobError> { config.database().update_job(&job_updated).await.map_err(|e| JobError::Other(OtherError(e)))?; - add_job_to_verification_queue(job.id, Duration::from_secs(job_handler.verification_polling_delay_seconds())) - .await - .map_err(|e| JobError::Other(OtherError(e)))?; + add_job_to_verification_queue( + job.id, + Duration::from_secs(job_handler.verification_polling_delay_seconds()), + config.clone(), + ) + .await + .map_err(|e| JobError::Other(OtherError(e)))?; Ok(()) } @@ -197,9 +199,8 @@ pub async fn process_job(id: Uuid) -> Result<(), JobError> { /// retries processing the job if the max attempts have not been exceeded. If the max attempts have /// been exceeded, it marks the job as timed out. If the verification is still pending, it pushes the /// job back to the queue. -pub async fn verify_job(id: Uuid) -> Result<(), JobError> { - let config = config().await; - let mut job = get_job(id).await?; +pub async fn verify_job(id: Uuid, config: Arc) -> Result<(), JobError> { + let mut job = get_job(id, config.clone()).await?; match job.status { JobStatus::PendingVerification => { @@ -211,7 +212,7 @@ pub async fn verify_job(id: Uuid) -> Result<(), JobError> { } let job_handler = factory::get_job_handler(&job.job_type).await; - let verification_status = job_handler.verify_job(config.as_ref(), &mut job).await?; + let verification_status = job_handler.verify_job(config.clone(), &mut job).await?; match verification_status { JobVerificationStatus::Verified => { @@ -239,7 +240,7 @@ pub async fn verify_job(id: Uuid) -> Result<(), JobError> { job.id, process_attempts + 1 ); - add_job_to_process_queue(job.id).await.map_err(|e| JobError::Other(OtherError(e)))?; + add_job_to_process_queue(job.id, config.clone()).await.map_err(|e| JobError::Other(OtherError(e)))?; return Ok(()); } } @@ -261,6 +262,7 @@ pub async fn verify_job(id: Uuid) -> Result<(), JobError> { add_job_to_verification_queue( job.id, Duration::from_secs(job_handler.verification_polling_delay_seconds()), + config.clone(), ) .await .map_err(|e| JobError::Other(OtherError(e)))?; @@ -272,10 +274,8 @@ pub async fn verify_job(id: Uuid) -> Result<(), JobError> { /// Terminates the job and updates the status of the job in the DB. /// Logs error if the job status `Completed` is existing on DL queue. -pub async fn handle_job_failure(id: Uuid) -> Result<(), JobError> { - let config = config().await; - - let mut job = get_job(id).await?.clone(); +pub async fn handle_job_failure(id: Uuid, config: Arc) -> Result<(), JobError> { + let mut job = get_job(id, config.clone()).await?.clone(); let mut metadata = job.metadata.clone(); if job.status == JobStatus::Completed { @@ -297,8 +297,7 @@ pub async fn handle_job_failure(id: Uuid) -> Result<(), JobError> { Ok(()) } -async fn get_job(id: Uuid) -> Result { - let config = config().await; +async fn get_job(id: Uuid, config: Arc) -> Result { let job = config.database().get_job_by_id(id).await.map_err(|e| JobError::Other(OtherError(e)))?; match job { Some(job) => Ok(job), diff --git a/crates/orchestrator/src/jobs/proving_job/mod.rs b/crates/orchestrator/src/jobs/proving_job/mod.rs index ab6cf1c1..a6b0761a 100644 --- a/crates/orchestrator/src/jobs/proving_job/mod.rs +++ b/crates/orchestrator/src/jobs/proving_job/mod.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::sync::Arc; use async_trait::async_trait; use cairo_vm::vm::runners::cairo_pie::CairoPie; @@ -34,7 +35,7 @@ pub struct ProvingJob; impl Job for ProvingJob { async fn create_job( &self, - _config: &Config, + _config: Arc, internal_id: String, metadata: HashMap, ) -> Result { @@ -49,7 +50,7 @@ impl Job for ProvingJob { }) } - async fn process_job(&self, config: &Config, job: &mut JobItem) -> Result { + async fn process_job(&self, config: Arc, job: &mut JobItem) -> Result { // Cairo Pie path in s3 storage client let cairo_pie_path = job.internal_id.to_string() + "/pie.zip"; let cairo_pie_file = config @@ -70,7 +71,7 @@ impl Job for ProvingJob { Ok(external_id) } - async fn verify_job(&self, config: &Config, job: &mut JobItem) -> Result { + async fn verify_job(&self, config: Arc, job: &mut JobItem) -> Result { let task_id: String = job.external_id.unwrap_string().map_err(|e| JobError::Other(OtherError(e)))?.into(); let task_status = config .prover_client() diff --git a/crates/orchestrator/src/jobs/register_proof_job/mod.rs b/crates/orchestrator/src/jobs/register_proof_job/mod.rs index e9a076dc..a1f88101 100644 --- a/crates/orchestrator/src/jobs/register_proof_job/mod.rs +++ b/crates/orchestrator/src/jobs/register_proof_job/mod.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::sync::Arc; use async_trait::async_trait; use color_eyre::Result; @@ -16,7 +17,7 @@ pub struct RegisterProofJob; impl Job for RegisterProofJob { async fn create_job( &self, - _config: &Config, + _config: Arc, internal_id: String, metadata: HashMap, ) -> Result { @@ -33,14 +34,14 @@ impl Job for RegisterProofJob { }) } - async fn process_job(&self, _config: &Config, _job: &mut JobItem) -> Result { + async fn process_job(&self, _config: Arc, _job: &mut JobItem) -> Result { // Get proof from storage and submit on chain for verification // We need to implement a generic trait for this to support multiple // base layers todo!() } - async fn verify_job(&self, _config: &Config, _job: &mut JobItem) -> Result { + async fn verify_job(&self, _config: Arc, _job: &mut JobItem) -> Result { // verify that the proof transaction has been included on chain todo!() } diff --git a/crates/orchestrator/src/jobs/snos_job/mod.rs b/crates/orchestrator/src/jobs/snos_job/mod.rs index 8c532bb2..790fc0d8 100644 --- a/crates/orchestrator/src/jobs/snos_job/mod.rs +++ b/crates/orchestrator/src/jobs/snos_job/mod.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::sync::Arc; use async_trait::async_trait; use color_eyre::Result; @@ -16,7 +17,7 @@ pub struct SnosJob; impl Job for SnosJob { async fn create_job( &self, - _config: &Config, + _config: Arc, internal_id: String, metadata: HashMap, ) -> Result { @@ -31,14 +32,14 @@ impl Job for SnosJob { }) } - async fn process_job(&self, _config: &Config, _job: &mut JobItem) -> Result { + async fn process_job(&self, _config: Arc, _job: &mut JobItem) -> Result { // 1. Fetch SNOS input data from Madara // 2. Import SNOS in Rust and execute it with the input data // 3. Store the received PIE in DB todo!() } - async fn verify_job(&self, _config: &Config, _job: &mut JobItem) -> Result { + async fn verify_job(&self, _config: Arc, _job: &mut JobItem) -> Result { // No need for verification as of now. If we later on decide to outsource SNOS run // to another service, verify_job can be used to poll on the status of the job todo!() diff --git a/crates/orchestrator/src/jobs/state_update_job/mod.rs b/crates/orchestrator/src/jobs/state_update_job/mod.rs index 48e4aeb9..a12267dc 100644 --- a/crates/orchestrator/src/jobs/state_update_job/mod.rs +++ b/crates/orchestrator/src/jobs/state_update_job/mod.rs @@ -1,6 +1,7 @@ pub mod utils; use std::collections::HashMap; +use std::sync::Arc; use ::utils::collections::{has_dup, is_sorted}; use async_trait::async_trait; @@ -18,7 +19,7 @@ use super::constants::{ }; use super::{JobError, OtherError}; -use crate::config::{config, Config}; +use crate::config::Config; use crate::constants::SNOS_OUTPUT_FILE_NAME; use crate::jobs::constants::JOB_METADATA_STATE_UPDATE_BLOCKS_TO_SETTLE_KEY; use crate::jobs::state_update_job::utils::fetch_blob_data_for_block; @@ -72,7 +73,7 @@ pub struct StateUpdateJob; impl Job for StateUpdateJob { async fn create_job( &self, - _config: &Config, + _config: Arc, internal_id: String, metadata: HashMap, ) -> Result { @@ -89,7 +90,7 @@ impl Job for StateUpdateJob { }) } - async fn process_job(&self, config: &Config, job: &mut JobItem) -> Result { + async fn process_job(&self, config: Arc, job: &mut JobItem) -> Result { let attempt_no = job .metadata .get(JOB_PROCESS_ATTEMPT_METADATA_KEY) @@ -99,7 +100,7 @@ impl Job for StateUpdateJob { // Read the metadata to get the blocks for which state update will be performed. // We assume that blocks nbrs are formatted as follow: "2,3,4,5,6". let mut block_numbers = self.get_block_numbers_from_metadata(job)?; - self.validate_block_numbers(config, &block_numbers).await?; + self.validate_block_numbers(config.clone(), &block_numbers).await?; // If we had a block state update failing last run, we recover from this block if let Some(last_failed_block) = job.metadata.get(JOB_METADATA_STATE_UPDATE_LAST_FAILED_BLOCK_NO) { @@ -113,8 +114,8 @@ impl Job for StateUpdateJob { let mut sent_tx_hashes: Vec = Vec::with_capacity(block_numbers.len()); for block_no in block_numbers.iter() { - let snos = self.fetch_snos_for_block(*block_no).await; - let tx_hash = self.update_state_for_block(config, *block_no, snos, nonce).await.map_err(|e| { + let snos = self.fetch_snos_for_block(*block_no, config.clone()).await; + let tx_hash = self.update_state_for_block(config.clone(), *block_no, snos, nonce).await.map_err(|e| { job.metadata.insert(JOB_METADATA_STATE_UPDATE_LAST_FAILED_BLOCK_NO.into(), block_no.to_string()); self.insert_attempts_into_metadata(job, &attempt_no, &sent_tx_hashes); @@ -139,7 +140,7 @@ impl Job for StateUpdateJob { /// Status will be verified if: /// 1. the last settlement tx hash is successful, /// 2. the expected last settled block from our configuration is indeed the one found in the provider. - async fn verify_job(&self, config: &Config, job: &mut JobItem) -> Result { + async fn verify_job(&self, config: Arc, job: &mut JobItem) -> Result { let attempt_no = job .metadata .get(JOB_PROCESS_ATTEMPT_METADATA_KEY) @@ -242,7 +243,7 @@ impl StateUpdateJob { } /// Validate that the list of block numbers to process is valid. - async fn validate_block_numbers(&self, config: &Config, block_numbers: &[u64]) -> Result<(), JobError> { + async fn validate_block_numbers(&self, config: Arc, block_numbers: &[u64]) -> Result<(), JobError> { if block_numbers.is_empty() { Err(StateUpdateError::BlockNumberNotFound)?; } @@ -264,7 +265,7 @@ impl StateUpdateJob { /// Update the state for the corresponding block using the settlement layer. async fn update_state_for_block( &self, - config: &Config, + config: Arc, block_no: u64, snos: StarknetOsOutput, nonce: u64, @@ -273,7 +274,9 @@ impl StateUpdateJob { let last_tx_hash_executed = if snos.use_kzg_da == Felt252::ZERO { unimplemented!("update_state_for_block not implemented as of now for calldata DA.") } else if snos.use_kzg_da == Felt252::ONE { - let blob_data = fetch_blob_data_for_block(block_no).await.map_err(|e| JobError::Other(OtherError(e)))?; + let blob_data = fetch_blob_data_for_block(block_no, config.clone()) + .await + .map_err(|e| JobError::Other(OtherError(e)))?; // Fetching nonce before the transaction is run // Sending update_state transaction from the settlement client @@ -288,8 +291,7 @@ impl StateUpdateJob { } /// Retrieves the SNOS output for the corresponding block. - async fn fetch_snos_for_block(&self, block_no: u64) -> StarknetOsOutput { - let config = config().await; + async fn fetch_snos_for_block(&self, block_no: u64, config: Arc) -> StarknetOsOutput { let storage_client = config.storage(); let key = block_no.to_string() + "/" + SNOS_OUTPUT_FILE_NAME; let snos_output_bytes = storage_client.get_data(&key).await.expect("Unable to fetch snos output for block"); diff --git a/crates/orchestrator/src/jobs/state_update_job/utils.rs b/crates/orchestrator/src/jobs/state_update_job/utils.rs index 1d92c9a4..87fca7ca 100644 --- a/crates/orchestrator/src/jobs/state_update_job/utils.rs +++ b/crates/orchestrator/src/jobs/state_update_job/utils.rs @@ -1,10 +1,11 @@ -use crate::config::config; +use std::sync::Arc; + +use crate::config::Config; use crate::constants::BLOB_DATA_FILE_NAME; use color_eyre::eyre::eyre; /// Fetching the blob data (stored in remote storage during DA job) for a particular block -pub async fn fetch_blob_data_for_block(block_number: u64) -> color_eyre::Result>> { - let config = config().await; +pub async fn fetch_blob_data_for_block(block_number: u64, config: Arc) -> color_eyre::Result>> { let storage_client = config.storage(); let key = block_number.to_string() + "/" + BLOB_DATA_FILE_NAME; let blob_data = storage_client.get_data(&key).await?; diff --git a/crates/orchestrator/src/main.rs b/crates/orchestrator/src/main.rs index 1739f453..dfeecd6d 100644 --- a/crates/orchestrator/src/main.rs +++ b/crates/orchestrator/src/main.rs @@ -1,5 +1,5 @@ use dotenvy::dotenv; -use orchestrator::config::config; +use orchestrator::config::init_config; use orchestrator::queue::init_consumers; use orchestrator::routes::app_router; use utils::env_utils::get_env_var_or_default; @@ -11,7 +11,8 @@ async fn main() { tracing_subscriber::fmt().with_max_level(tracing::Level::INFO).with_target(false).init(); // initial config setup - config().await; + let config = init_config().await; + let host = get_env_var_or_default("HOST", "127.0.0.1"); let port = get_env_var_or_default("PORT", "3000").parse::().expect("PORT must be a u16"); let address = format!("{}:{}", host, port); @@ -19,7 +20,7 @@ async fn main() { let app = app_router(); // init consumer - init_consumers().await.expect("Failed to init consumers"); + init_consumers(config).await.expect("Failed to init consumers"); tracing::info!("Listening on http://{}", address); axum::serve(listener, app).await.expect("Failed to start axum server"); diff --git a/crates/orchestrator/src/queue/job_queue.rs b/crates/orchestrator/src/queue/job_queue.rs index 44d492da..05b9e6fa 100644 --- a/crates/orchestrator/src/queue/job_queue.rs +++ b/crates/orchestrator/src/queue/job_queue.rs @@ -1,16 +1,24 @@ use std::future::Future; +use std::sync::Arc; use std::time::Duration; use color_eyre::eyre::Context; use color_eyre::Result as EyreResult; use omniqueue::{Delivery, QueueError}; use serde::{Deserialize, Serialize}; +use thiserror::Error; use tokio::time::sleep; use tracing::log; use uuid::Uuid; -use crate::config::config; +use crate::config::Config; use crate::jobs::{handle_job_failure, process_job, verify_job, JobError, OtherError}; +use crate::workers::data_submission_worker::DataSubmissionWorker; +use crate::workers::proof_registration::ProofRegistrationWorker; +use crate::workers::proving::ProvingWorker; +use crate::workers::snos::SnosWorker; +use crate::workers::update_state::UpdateStateWorker; +use crate::workers::Worker; pub const JOB_PROCESSING_QUEUE: &str = "madara_orchestrator_job_processing_queue"; pub const JOB_VERIFICATION_QUEUE: &str = "madara_orchestrator_job_verification_queue"; @@ -20,14 +28,6 @@ pub const JOB_HANDLE_FAILURE_QUEUE: &str = "madara_orchestrator_job_handle_failu // Queues for SNOS worker trigger listening pub const WORKER_TRIGGER_QUEUE: &str = "madara_orchestrator_worker_trigger_queue"; -use crate::workers::data_submission_worker::DataSubmissionWorker; -use crate::workers::proof_registration::ProofRegistrationWorker; -use crate::workers::proving::ProvingWorker; -use crate::workers::snos::SnosWorker; -use crate::workers::update_state::UpdateStateWorker; -use crate::workers::Worker; -use thiserror::Error; - #[derive(Error, Debug, PartialEq)] pub enum ConsumptionError { #[error("Failed to consume message from queue, error {error_msg:?}")] @@ -67,23 +67,28 @@ enum DeliveryReturnType { NoMessage, } -pub async fn add_job_to_process_queue(id: Uuid) -> EyreResult<()> { +pub async fn add_job_to_process_queue(id: Uuid, config: Arc) -> EyreResult<()> { log::info!("Adding job with id {:?} to processing queue", id); - add_job_to_queue(id, JOB_PROCESSING_QUEUE.to_string(), None).await + add_job_to_queue(id, JOB_PROCESSING_QUEUE.to_string(), None, config).await } -pub async fn add_job_to_verification_queue(id: Uuid, delay: Duration) -> EyreResult<()> { +pub async fn add_job_to_verification_queue(id: Uuid, delay: Duration, config: Arc) -> EyreResult<()> { log::info!("Adding job with id {:?} to verification queue", id); - add_job_to_queue(id, JOB_VERIFICATION_QUEUE.to_string(), Some(delay)).await + add_job_to_queue(id, JOB_VERIFICATION_QUEUE.to_string(), Some(delay), config).await } -pub async fn consume_job_from_queue(queue: String, handler: F) -> Result<(), ConsumptionError> +pub async fn consume_job_from_queue( + queue: String, + handler: F, + config: Arc, +) -> Result<(), ConsumptionError> where - F: FnOnce(Uuid) -> Fut, + F: FnOnce(Uuid, Arc) -> Fut, Fut: Future>, { log::info!("Consuming from queue {:?}", queue); - let delivery = get_delivery_from_queue(&queue).await?; + + let delivery = get_delivery_from_queue(&queue, config.clone()).await?; let message = match delivery { DeliveryReturnType::Message(message) => message, @@ -93,7 +98,7 @@ where let job_message = parse_job_message(&message)?; if let Some(job_message) = job_message { - handle_job_message(job_message, message, handler).await?; + handle_job_message(job_message, message, handler, config).await?; } Ok(()) @@ -104,13 +109,14 @@ where pub async fn consume_worker_trigger_messages_from_queue( queue: String, handler: F, + config: Arc, ) -> Result<(), ConsumptionError> where - F: FnOnce(Box) -> Fut, + F: FnOnce(Box, Arc) -> Fut, Fut: Future>, { log::info!("Consuming from queue {:?}", queue); - let delivery = get_delivery_from_queue(&queue).await?; + let delivery = get_delivery_from_queue(&queue, config.clone()).await?; let message = match delivery { DeliveryReturnType::Message(message) => message, @@ -120,7 +126,7 @@ where let job_message = parse_worker_message(&message)?; if let Some(job_message) = job_message { - handle_worker_message(job_message, message, handler).await?; + handle_worker_message(job_message, message, handler, config).await?; } Ok(()) @@ -144,14 +150,15 @@ async fn handle_job_message( job_message: JobQueueMessage, message: Delivery, handler: F, + config: Arc, ) -> Result<(), ConsumptionError> where - F: FnOnce(Uuid) -> Fut, + F: FnOnce(Uuid, Arc) -> Fut, Fut: Future>, { log::info!("Handling job with id {:?}", job_message.id); - match handler(job_message.id).await { + match handler(job_message.id, config.clone()).await { Ok(_) => { message .ack() @@ -163,8 +170,7 @@ where } Err(e) => { log::error!("Failed to handle job with id {:?}. Error: {:?}", job_message.id, e); - config() - .await + config .alerts() .send_alert_message(e.to_string()) .await @@ -188,15 +194,16 @@ async fn handle_worker_message( job_message: WorkerTriggerMessage, message: Delivery, handler: F, + config: Arc, ) -> Result<(), ConsumptionError> where - F: FnOnce(Box) -> Fut, + F: FnOnce(Box, Arc) -> Fut, Fut: Future>, { log::info!("Handling worker trigger for worker type : {:?}", job_message.worker); let worker_handler = get_worker_handler_from_worker_trigger_type(job_message.worker.clone()); - match handler(worker_handler).await { + match handler(worker_handler, config.clone()).await { Ok(_) => { message .ack() @@ -208,8 +215,7 @@ where } Err(e) => { log::error!("Failed to handle worker trigger {:?}. Error: {:?}", job_message.worker, e); - config() - .await + config .alerts() .send_alert_message(e.to_string()) .await @@ -236,8 +242,8 @@ fn get_worker_handler_from_worker_trigger_type(worker_trigger_type: WorkerTrigge } /// To get the delivery from the message queue using the queue name -async fn get_delivery_from_queue(queue: &str) -> Result { - match config().await.queue().consume_message_from_queue(queue.to_string()).await { +async fn get_delivery_from_queue(queue: &str, config: Arc) -> Result { + match config.queue().consume_message_from_queue(queue.to_string()).await { Ok(d) => Ok(DeliveryReturnType::Message(d)), Err(QueueError::NoData) => Ok(DeliveryReturnType::NoMessage), Err(e) => Err(ConsumptionError::FailedToConsumeFromQueue { error_msg: e.to_string() }), @@ -245,10 +251,11 @@ async fn get_delivery_from_queue(queue: &str) -> Result { + ($queue_type :expr, $handler : expr, $consume_function: expr, $config :expr) => { + let config_clone = $config.clone(); tokio::spawn(async move { loop { - match $consume_function($queue_type, $handler).await { + match $consume_function($queue_type, $handler, config_clone.clone()).await { Ok(_) => {} Err(e) => log::error!("Failed to consume from queue {:?}. Error: {:?}", $queue_type, e), } @@ -258,22 +265,21 @@ macro_rules! spawn_consumer { }; } -pub async fn init_consumers() -> Result<(), JobError> { - spawn_consumer!(JOB_PROCESSING_QUEUE.to_string(), process_job, consume_job_from_queue); - spawn_consumer!(JOB_VERIFICATION_QUEUE.to_string(), verify_job, consume_job_from_queue); - spawn_consumer!(JOB_HANDLE_FAILURE_QUEUE.to_string(), handle_job_failure, consume_job_from_queue); - spawn_consumer!(WORKER_TRIGGER_QUEUE.to_string(), spawn_worker, consume_worker_trigger_messages_from_queue); +pub async fn init_consumers(config: Arc) -> Result<(), JobError> { + spawn_consumer!(JOB_PROCESSING_QUEUE.to_string(), process_job, consume_job_from_queue, config.clone()); + spawn_consumer!(JOB_VERIFICATION_QUEUE.to_string(), verify_job, consume_job_from_queue, config.clone()); + spawn_consumer!(JOB_HANDLE_FAILURE_QUEUE.to_string(), handle_job_failure, consume_job_from_queue, config.clone()); + spawn_consumer!(WORKER_TRIGGER_QUEUE.to_string(), spawn_worker, consume_worker_trigger_messages_from_queue, config); Ok(()) } /// To spawn the worker by passing the worker struct -async fn spawn_worker(worker: Box) -> color_eyre::Result<()> { - worker.run_worker_if_enabled().await.expect("Error in running the worker."); +async fn spawn_worker(worker: Box, config: Arc) -> color_eyre::Result<()> { + worker.run_worker_if_enabled(config).await.expect("Error in running the worker."); Ok(()) } -async fn add_job_to_queue(id: Uuid, queue: String, delay: Option) -> EyreResult<()> { - let config = config().await; +async fn add_job_to_queue(id: Uuid, queue: String, delay: Option, config: Arc) -> EyreResult<()> { let message = JobQueueMessage { id }; config.queue().send_message_to_queue(queue, serde_json::to_string(&message)?, delay).await?; Ok(()) diff --git a/crates/orchestrator/src/queue/mod.rs b/crates/orchestrator/src/queue/mod.rs index 48599364..fd2215cb 100644 --- a/crates/orchestrator/src/queue/mod.rs +++ b/crates/orchestrator/src/queue/mod.rs @@ -1,14 +1,14 @@ pub mod job_queue; pub mod sqs; -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use async_trait::async_trait; use color_eyre::Result as EyreResult; use mockall::automock; use omniqueue::{Delivery, QueueError}; -use crate::jobs::JobError; +use crate::{config::Config, jobs::JobError}; /// The QueueProvider trait is used to define the methods that a queue /// should implement to be used as a queue for the orchestrator. The @@ -20,6 +20,6 @@ pub trait QueueProvider: Send + Sync { async fn consume_message_from_queue(&self, queue: String) -> std::result::Result; } -pub async fn init_consumers() -> Result<(), JobError> { - job_queue::init_consumers().await +pub async fn init_consumers(config: Arc) -> Result<(), JobError> { + job_queue::init_consumers(config).await } diff --git a/crates/orchestrator/src/tests/alerts/mod.rs b/crates/orchestrator/src/tests/alerts/mod.rs index 2026061b..f40940b2 100644 --- a/crates/orchestrator/src/tests/alerts/mod.rs +++ b/crates/orchestrator/src/tests/alerts/mod.rs @@ -1,25 +1,26 @@ -use crate::config::config; -use crate::tests::common::{get_sns_client, get_sqs_client}; -use crate::tests::config::TestConfigBuilder; +use std::time::Duration; + use aws_sdk_sqs::types::QueueAttributeName::QueueArn; use rstest::rstest; -use std::time::Duration; use tokio::time::sleep; + use utils::env_utils::get_env_var_or_panic; +use crate::tests::common::{get_sns_client, get_sqs_client}; +use crate::tests::config::{ConfigType, TestConfigBuilder}; + pub const SNS_ALERT_TEST_QUEUE: &str = "orchestrator_sns_alert_testing_queue"; #[rstest] #[tokio::test] async fn sns_alert_subscribe_to_topic_receive_alert_works() { - TestConfigBuilder::new().build().await; + let services = TestConfigBuilder::new().configure_alerts(ConfigType::Actual).build().await; let sqs_client = get_sqs_client().await; let queue = sqs_client.create_queue().queue_name(SNS_ALERT_TEST_QUEUE).send().await.unwrap(); let queue_url = queue.queue_url().unwrap(); - let sns_client = get_sns_client().await; - let config = config().await; + let sns_client = get_sns_client(&services.aws_config).await; let queue_attributes = sqs_client.get_queue_attributes().queue_url(queue_url).attribute_names(QueueArn).send().await.unwrap(); @@ -39,7 +40,7 @@ async fn sns_alert_subscribe_to_topic_receive_alert_works() { let message_to_send = "Hello World :)"; // Getting sns client from the module - let alerts_client = config.alerts(); + let alerts_client = services.config.alerts(); // Sending the alert message alerts_client.send_alert_message(message_to_send.to_string()).await.unwrap(); diff --git a/crates/orchestrator/src/tests/common/mod.rs b/crates/orchestrator/src/tests/common/mod.rs index 83cf7e68..c5c70261 100644 --- a/crates/orchestrator/src/tests/common/mod.rs +++ b/crates/orchestrator/src/tests/common/mod.rs @@ -3,7 +3,7 @@ pub mod constants; use std::collections::HashMap; use ::uuid::Uuid; -use aws_config::Region; +use aws_config::{Region, SdkConfig}; use aws_sdk_sns::error::SdkError; use aws_sdk_sns::operation::create_topic::CreateTopicError; use mongodb::Client; @@ -43,16 +43,14 @@ pub fn custom_job_item(default_job_item: JobItem, #[default(String::from("0"))] job_item } -pub async fn create_sns_arn() -> Result<(), SdkError> { - let sns_client = get_sns_client().await; +pub async fn create_sns_arn(aws_config: &SdkConfig) -> Result<(), SdkError> { + let sns_client = get_sns_client(aws_config).await; sns_client.create_topic().name(get_env_var_or_panic("AWS_SNS_ARN_NAME")).send().await?; Ok(()) } -pub async fn get_sns_client() -> aws_sdk_sns::client::Client { - let sns_region = get_env_var_or_panic("AWS_SNS_REGION"); - let config = aws_config::from_env().region(Region::new(sns_region)).load().await; - aws_sdk_sns::Client::new(&config) +pub async fn get_sns_client(aws_config: &SdkConfig) -> aws_sdk_sns::client::Client { + aws_sdk_sns::Client::new(aws_config) } pub async fn drop_database() -> color_eyre::Result<()> { diff --git a/crates/orchestrator/src/tests/config.rs b/crates/orchestrator/src/tests/config.rs index 8424d6d7..a04b08a7 100644 --- a/crates/orchestrator/src/tests/config.rs +++ b/crates/orchestrator/src/tests/config.rs @@ -1,49 +1,97 @@ use std::sync::Arc; -use crate::config::{ - build_alert_client, build_da_client, build_prover_service, build_settlement_client, config_force_init, Config, -}; -use crate::data_storage::DataStorage; -use da_client_interface::DaClient; +use aws_config::SdkConfig; use httpmock::MockServer; - -use crate::alerts::Alerts; -use prover_client_interface::ProverClient; -use settlement_client_interface::SettlementClient; use starknet::providers::jsonrpc::HttpTransport; use starknet::providers::{JsonRpcClient, Url}; + +use da_client_interface::{DaClient, MockDaClient}; +use prover_client_interface::{MockProverClient, ProverClient}; +use settlement_client_interface::{MockSettlementClient, SettlementClient}; use utils::env_utils::get_env_var_or_panic; use utils::settings::default::DefaultSettingsProvider; -use crate::database::mongodb::config::MongoDbConfig; -use crate::database::mongodb::MongoDb; -use crate::database::{Database, DatabaseConfig}; -use crate::queue::sqs::SqsQueue; -use crate::queue::QueueProvider; -use crate::tests::common::{create_sns_arn, create_sqs_queues, drop_database, get_storage_client}; +use crate::alerts::{Alerts, MockAlerts}; +use crate::config::{ + build_alert_client, build_da_client, build_database_client, build_prover_service, build_queue_client, + build_settlement_client, Config, +}; +use crate::data_storage::{DataStorage, MockDataStorage}; +use crate::database::{Database, MockDatabase}; +use crate::queue::{MockQueueProvider, QueueProvider}; +use crate::tests::common::{create_sns_arn, create_sqs_queues, drop_database}; + +use super::common::get_storage_client; // Inspiration : https://rust-unofficial.github.io/patterns/patterns/creational/builder.html // TestConfigBuilder allows to heavily customise the global configs based on the test's requirement. // Eg: We want to mock only the da client and leave rest to be as it is, use mock_da_client. +pub enum MockType { + StarknetClient(Arc>), + DaClient(Box), + ProverClient(Box), + SettlementClient(Box), + + Alerts(Box), + Database(Box), + Queue(Box), + Storage(Box), +} + +// By default, everything is on Dummy. +pub enum ConfigType { + Mock(MockType), + Actual, + Dummy, +} + +impl From> for ConfigType { + fn from(client: JsonRpcClient) -> Self { + ConfigType::Mock(MockType::StarknetClient(Arc::new(client))) + } +} + +macro_rules! impl_mock_from { + ($($mock_type:ty => $variant:ident),+) => { + $( + impl From<$mock_type> for ConfigType { + fn from(client: $mock_type) -> Self { + ConfigType::Mock(MockType::$variant(Box::new(client))) + } + } + )+ + }; +} + +impl_mock_from! { + MockProverClient => ProverClient, + MockDatabase => Database, + MockDaClient => DaClient, + MockQueueProvider => Queue, + MockDataStorage => Storage, + MockSettlementClient => SettlementClient +} + // TestBuilder for Config pub struct TestConfigBuilder { /// The starknet client to get data from the node - starknet_client: Option>>, + starknet_client_type: ConfigType, /// The DA client to interact with the DA layer - da_client: Option>, - /// The service that produces proof and registers it onchain - prover_client: Option>, + da_client_type: ConfigType, + /// The service that produces proof and registers it on chain + prover_client_type: ConfigType, /// Settlement client - settlement_client: Option>, + settlement_client_type: ConfigType, + + /// Alerts client + alerts_type: ConfigType, /// The database client - database: Option>, + database_type: ConfigType, /// Queue client - queue: Option>, + queue_type: ConfigType, /// Storage client - storage: Option>, - /// Alerts client - alerts: Option>, + storage_type: ConfigType, } impl Default for TestConfigBuilder { @@ -52,128 +100,209 @@ impl Default for TestConfigBuilder { } } +pub struct TestConfigBuilderReturns { + pub server: Option, + pub config: Arc, + pub aws_config: SdkConfig, +} impl TestConfigBuilder { /// Create a new config pub fn new() -> TestConfigBuilder { TestConfigBuilder { - starknet_client: None, - da_client: None, - prover_client: None, - settlement_client: None, - database: None, - queue: None, - storage: None, - alerts: None, + starknet_client_type: ConfigType::Dummy, + da_client_type: ConfigType::Dummy, + prover_client_type: ConfigType::Dummy, + settlement_client_type: ConfigType::Dummy, + database_type: ConfigType::Dummy, + queue_type: ConfigType::Dummy, + storage_type: ConfigType::Dummy, + alerts_type: ConfigType::Dummy, } } - pub fn mock_da_client(mut self, da_client: Box) -> TestConfigBuilder { - self.da_client = Some(da_client); + pub fn configure_da_client(mut self, da_client_type: ConfigType) -> TestConfigBuilder { + self.da_client_type = da_client_type; self } - pub fn mock_db_client(mut self, db_client: Box) -> TestConfigBuilder { - self.database = Some(db_client); + pub fn configure_settlement_client(mut self, settlement_client_type: ConfigType) -> TestConfigBuilder { + self.settlement_client_type = settlement_client_type; self } - pub fn mock_settlement_client(mut self, settlement_client: Box) -> TestConfigBuilder { - self.settlement_client = Some(settlement_client); + pub fn configure_starknet_client(mut self, starknet_client_type: ConfigType) -> TestConfigBuilder { + self.starknet_client_type = starknet_client_type; self } - pub fn mock_starknet_client(mut self, starknet_client: Arc>) -> TestConfigBuilder { - self.starknet_client = Some(starknet_client); + pub fn configure_prover_client(mut self, prover_client_type: ConfigType) -> TestConfigBuilder { + self.prover_client_type = prover_client_type; self } - pub fn mock_prover_client(mut self, prover_client: Box) -> TestConfigBuilder { - self.prover_client = Some(prover_client); + pub fn configure_alerts(mut self, alert_option: ConfigType) -> TestConfigBuilder { + self.alerts_type = alert_option; self } - pub fn mock_storage_client(mut self, storage_client: Box) -> TestConfigBuilder { - self.storage = Some(storage_client); + pub fn configure_storage_client(mut self, storage_client_option: ConfigType) -> TestConfigBuilder { + self.storage_type = storage_client_option; self } - pub fn mock_queue(mut self, queue: Box) -> TestConfigBuilder { - self.queue = Some(queue); + pub fn configure_queue_client(mut self, queue_type: ConfigType) -> TestConfigBuilder { + self.queue_type = queue_type; self } - - pub fn mock_alerts(mut self, alerts: Box) -> TestConfigBuilder { - self.alerts = Some(alerts); + pub fn configure_database(mut self, database_type: ConfigType) -> TestConfigBuilder { + self.database_type = database_type; self } - pub async fn build(mut self) -> MockServer { + pub async fn build(self) -> TestConfigBuilderReturns { dotenvy::from_filename("../.env.test").expect("Failed to load the .env file"); - let server = MockServer::start(); - let settings_provider = DefaultSettingsProvider {}; + let aws_config = aws_config::load_from_env().await; - // init database - if self.database.is_none() { - self.database = Some(Box::new(MongoDb::new(MongoDbConfig::new_from_env()).await)); - } + use std::sync::Arc; - // init the DA client - if self.da_client.is_none() { - self.da_client = Some(build_da_client().await); - } + let TestConfigBuilder { + starknet_client_type, + alerts_type, + da_client_type, + prover_client_type, + settlement_client_type, + database_type, + queue_type, + storage_type, + } = self; - // init the Settings client - if self.settlement_client.is_none() { - self.settlement_client = Some(build_settlement_client(&settings_provider).await); - } + let (starknet_client, server) = init_starknet_client(starknet_client_type).await; + let alerts = init_alerts(alerts_type, &aws_config).await; + let da_client = init_da_client(da_client_type).await; - // init the storage client - if self.storage.is_none() { - self.storage = Some(get_storage_client().await); - match get_env_var_or_panic("DATA_STORAGE").as_str() { - "s3" => self - .storage - .as_ref() - .unwrap() - .build_test_bucket(&get_env_var_or_panic("AWS_S3_BUCKET_NAME")) - .await - .unwrap(), - _ => panic!("Unsupported Storage Client"), - } - } + let settlement_client = init_settlement_client(settlement_client_type).await; - if self.alerts.is_none() { - self.alerts = Some(build_alert_client().await); - } + let prover_client = init_prover_client(prover_client_type).await; + // External Dependencies + let storage = init_storage_client(storage_type).await; + let database = init_database(database_type).await; + let queue = init_queue_client(queue_type).await; // Deleting and Creating the queues in sqs. create_sqs_queues().await.expect("Not able to delete and create the queues."); // Deleting the database drop_database().await.expect("Unable to drop the database."); // Creating the SNS ARN - create_sns_arn().await.expect("Unable to create the sns arn"); - - let config = Config::new( - self.starknet_client.unwrap_or_else(|| { - let provider = JsonRpcClient::new(HttpTransport::new( - Url::parse(format!("http://localhost:{}", server.port()).as_str()).expect("Failed to parse URL"), - )); - Arc::new(provider) - }), - self.da_client.unwrap(), - self.prover_client.unwrap_or_else(|| build_prover_service(&settings_provider)), - self.settlement_client.unwrap(), - self.database.unwrap(), - self.queue.unwrap_or_else(|| Box::new(SqsQueue {})), - self.storage.unwrap(), - self.alerts.unwrap(), - ); - - drop_database().await.unwrap(); - - config_force_init(config).await; - - server + create_sns_arn(&aws_config).await.expect("Unable to create the sns arn"); + + let config = Arc::new(Config::new( + starknet_client, + da_client, + prover_client, + settlement_client, + database, + queue, + storage, + alerts, + )); + + TestConfigBuilderReturns { server, config, aws_config } + } +} + +macro_rules! impl_mock_match { + ($client:expr, $mock_type:path, $client_type:ty) => { + match $client { + $mock_type(client) => client, + _ => panic!(concat!("Mock client is not a ", stringify!($client_type))), + } + }; +} + +async fn init_da_client(service: ConfigType) -> Box { + match service { + ConfigType::Mock(client) => impl_mock_match!(client, MockType::DaClient, DaClient), + ConfigType::Actual => build_da_client().await, + ConfigType::Dummy => Box::new(MockDaClient::new()), + } +} + +async fn init_settlement_client(service: ConfigType) -> Box { + let settings_provider = DefaultSettingsProvider {}; + match service { + ConfigType::Mock(client) => impl_mock_match!(client, MockType::SettlementClient, SettlementClient), + ConfigType::Actual => build_settlement_client(&settings_provider).await, + ConfigType::Dummy => Box::new(MockSettlementClient::new()), + } +} + +async fn init_prover_client(service: ConfigType) -> Box { + let settings_provider = DefaultSettingsProvider {}; + match service { + ConfigType::Mock(client) => impl_mock_match!(client, MockType::ProverClient, ProverClient), + ConfigType::Actual => build_prover_service(&settings_provider), + ConfigType::Dummy => Box::new(MockProverClient::new()), + } +} + +async fn init_alerts(service: ConfigType, aws_config: &SdkConfig) -> Box { + match service { + ConfigType::Mock(client) => impl_mock_match!(client, MockType::Alerts, Alerts), + ConfigType::Actual => build_alert_client(aws_config).await, + ConfigType::Dummy => Box::new(MockAlerts::new()), + } +} + +async fn init_storage_client(service: ConfigType) -> Box { + match service { + ConfigType::Mock(client) => impl_mock_match!(client, MockType::Storage, Storage), + ConfigType::Actual => { + let storage = get_storage_client().await; + match get_env_var_or_panic("DATA_STORAGE").as_str() { + "s3" => storage.as_ref().build_test_bucket(&get_env_var_or_panic("AWS_S3_BUCKET_NAME")).await.unwrap(), + _ => panic!("Unsupported Storage Client"), + } + storage + } + ConfigType::Dummy => Box::new(MockDataStorage::new()), + } +} + +async fn init_queue_client(service: ConfigType) -> Box { + match service { + ConfigType::Mock(client) => impl_mock_match!(client, MockType::Queue, Queue), + ConfigType::Actual => build_queue_client(), + ConfigType::Dummy => Box::new(MockQueueProvider::new()), + } +} + +async fn init_database(service: ConfigType) -> Box { + match service { + ConfigType::Mock(client) => impl_mock_match!(client, MockType::Database, Database), + ConfigType::Actual => build_database_client().await, + ConfigType::Dummy => Box::new(MockDatabase::new()), + } +} + +async fn init_starknet_client(service: ConfigType) -> (Arc>, Option) { + fn get_provider() -> (Arc>, Option) { + let server = MockServer::start(); + let port = server.port(); + let service = Arc::new(JsonRpcClient::new(HttpTransport::new( + Url::parse(format!("http://localhost:{}", port).as_str()).expect("Failed to parse URL"), + ))); + (service, Some(server)) + } + + match service { + ConfigType::Mock(client) => { + if let MockType::StarknetClient(starknet_client) = client { + (starknet_client, None) + } else { + panic!("Mock client is not a Starknet Client"); + } + } + ConfigType::Actual | ConfigType::Dummy => get_provider(), } } diff --git a/crates/orchestrator/src/tests/data_storage/mod.rs b/crates/orchestrator/src/tests/data_storage/mod.rs index a3055acb..968e1c61 100644 --- a/crates/orchestrator/src/tests/data_storage/mod.rs +++ b/crates/orchestrator/src/tests/data_storage/mod.rs @@ -1,10 +1,8 @@ -use crate::data_storage::aws_s3::config::AWSS3Config; -use crate::data_storage::aws_s3::AWSS3; -use crate::data_storage::{DataStorage, DataStorageConfig}; use bytes::Bytes; use rstest::rstest; use serde_json::json; -use utils::env_utils::get_env_var_or_panic; + +use crate::tests::config::{ConfigType, TestConfigBuilder}; /// This test checks the ability to put and get data from AWS S3 using `AWSS3`. /// It puts JSON data into a test bucket and retrieves it, verifying the data @@ -13,13 +11,11 @@ use utils::env_utils::get_env_var_or_panic; #[rstest] #[tokio::test] async fn test_put_and_get_data_s3() -> color_eyre::Result<()> { + let services = TestConfigBuilder::new().configure_storage_client(ConfigType::Actual).build().await; + dotenvy::from_filename("../.env.test")?; - let config = AWSS3Config::new_from_env(); - let aws_config = - aws_config::load_from_env().await.into_builder().endpoint_url(get_env_var_or_panic("AWS_ENDPOINT_URL")).build(); - let s3_client = AWSS3::new(config, &aws_config); - s3_client.build_test_bucket(&get_env_var_or_panic("AWS_S3_BUCKET_NAME")).await.unwrap(); + let s3_client = services.config.storage(); let mock_data = json!( { diff --git a/crates/orchestrator/src/tests/database/mod.rs b/crates/orchestrator/src/tests/database/mod.rs index 58866582..6c26a611 100644 --- a/crates/orchestrator/src/tests/database/mod.rs +++ b/crates/orchestrator/src/tests/database/mod.rs @@ -1,30 +1,23 @@ -use crate::config::{config, Config}; -use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType}; -use crate::tests::config::TestConfigBuilder; -use arc_swap::Guard; use rstest::*; -use std::sync::Arc; use uuid::Uuid; +use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType}; +use crate::tests::config::{ConfigType, TestConfigBuilder}; + #[rstest] #[tokio::test] async fn test_database_connection() -> color_eyre::Result<()> { - TestConfigBuilder::new().build().await; + let _services = TestConfigBuilder::new().build().await; Ok(()) } -#[fixture] -async fn get_config() -> Guard> { - config().await -} - /// Tests for `create_job` operation in database trait. /// Creates 3 jobs and asserts them. #[rstest] #[tokio::test] -async fn database_create_job_works(#[future] get_config: Guard>) { - TestConfigBuilder::new().build().await; - let config = get_config.await; +async fn database_create_job_works() { + let services = TestConfigBuilder::new().configure_database(ConfigType::Actual).build().await; + let config = services.config; let database_client = config.database(); let job_vec = [ @@ -61,12 +54,9 @@ async fn database_create_job_works(#[future] get_config: Guard>) { #[case(true)] #[case(false)] #[tokio::test] -async fn database_get_jobs_without_successor_works( - #[future] get_config: Guard>, - #[case] is_successor: bool, -) { - TestConfigBuilder::new().build().await; - let config = get_config.await; +async fn database_get_jobs_without_successor_works(#[case] is_successor: bool) { + let services = TestConfigBuilder::new().configure_database(ConfigType::Actual).build().await; + let config = services.config; let database_client = config.database(); let job_vec = [ @@ -108,9 +98,9 @@ async fn database_get_jobs_without_successor_works( /// - Should return the last successful job #[rstest] #[tokio::test] -async fn database_get_last_successful_job_by_type_works(#[future] get_config: Guard>) { - TestConfigBuilder::new().build().await; - let config = get_config.await; +async fn database_get_last_successful_job_by_type_works() { + let services = TestConfigBuilder::new().configure_database(ConfigType::Actual).build().await; + let config = services.config; let database_client = config.database(); let job_vec = [ @@ -136,9 +126,9 @@ async fn database_get_last_successful_job_by_type_works(#[future] get_config: Gu /// - Should return the jobs after internal id #[rstest] #[tokio::test] -async fn database_get_jobs_after_internal_id_by_job_type_works(#[future] get_config: Guard>) { - TestConfigBuilder::new().build().await; - let config = get_config.await; +async fn database_get_jobs_after_internal_id_by_job_type_works() { + let services = TestConfigBuilder::new().configure_database(ConfigType::Actual).build().await; + let config = services.config; let database_client = config.database(); let job_vec = [ @@ -171,9 +161,10 @@ async fn database_get_jobs_after_internal_id_by_job_type_works(#[future] get_con /// Happy Case : Creating a job with version 0 and updating the job with version 0 update only. #[rstest] #[tokio::test] -async fn database_update_job_status_passing_case_works(#[future] get_config: Guard>) { - TestConfigBuilder::new().build().await; - let config = get_config.await; +async fn database_update_job_status_passing_case_works() { + let services = TestConfigBuilder::new().configure_database(ConfigType::Actual).build().await; + let config = services.config; + let database_client = config.database(); let job = build_job_item(JobType::SnosRun, JobStatus::Created, 1); @@ -189,9 +180,9 @@ async fn database_update_job_status_passing_case_works(#[future] get_config: Gua /// Failing Case : Creating a job with version 1 and updating the job with version 0 update only. #[rstest] #[tokio::test] -async fn database_update_job_status_failing_case_works(#[future] get_config: Guard>) { - TestConfigBuilder::new().build().await; - let config = get_config.await; +async fn database_update_job_status_failing_case_works() { + let services = TestConfigBuilder::new().configure_database(ConfigType::Actual).build().await; + let config = services.config; let database_client = config.database(); // Scenario : diff --git a/crates/orchestrator/src/tests/jobs/da_job/mod.rs b/crates/orchestrator/src/tests/jobs/da_job/mod.rs index 1390e46e..d1301513 100644 --- a/crates/orchestrator/src/tests/jobs/da_job/mod.rs +++ b/crates/orchestrator/src/tests/jobs/da_job/mod.rs @@ -1,20 +1,23 @@ -use crate::jobs::da_job::test::{get_nonce_attached, read_state_update_from_file}; -use crate::jobs::da_job::{DaError, DaJob}; -use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType}; -use crate::jobs::JobError; -use crate::tests::common::drop_database; -use crate::tests::config::TestConfigBuilder; -use crate::{config::config, jobs::Job}; +use std::collections::HashMap; + use assert_matches::assert_matches; use color_eyre::eyre::eyre; -use da_client_interface::MockDaClient; use mockall::predicate::always; use rstest::rstest; use serde_json::json; use starknet_core::types::{FieldElement, MaybePendingStateUpdate, PendingStateUpdate, StateDiff}; -use std::collections::HashMap; use uuid::Uuid; +use da_client_interface::MockDaClient; + +use crate::jobs::da_job::test::{get_nonce_attached, read_state_update_from_file}; +use crate::jobs::da_job::{DaError, DaJob}; +use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType}; +use crate::jobs::Job; +use crate::jobs::JobError; +use crate::tests::common::drop_database; +use crate::tests::config::{ConfigType, TestConfigBuilder}; + /// Tests the DA Job's handling of a blob length exceeding the supported size. /// It mocks the DA client to simulate the environment and expects an error on job processing. /// Validates the error message for exceeding blob limits against the expected output. @@ -39,15 +42,19 @@ async fn test_da_job_process_job_failure_on_small_blob_size( da_client.expect_max_blob_per_txn().with().returning(|| 1); da_client.expect_max_bytes_per_blob().with().returning(|| 1200); - let server = TestConfigBuilder::new().mock_da_client(Box::new(da_client)).build().await; - let config = config().await; + let services = TestConfigBuilder::new() + .configure_starknet_client(ConfigType::Actual) + .configure_storage_client(ConfigType::Actual) + .configure_da_client(da_client.into()) + .build() + .await; let state_update = read_state_update_from_file(state_update_file.as_str()).expect("issue while reading"); let state_update = MaybePendingStateUpdate::Update(state_update); let state_update = serde_json::to_value(&state_update).unwrap(); let response = json!({ "id": 640641,"jsonrpc":"2.0","result": state_update }); - + let server = services.server.unwrap(); get_nonce_attached(&server, nonces_file.as_str()); let state_update_mock = server.mock(|when, then| { @@ -55,11 +62,11 @@ async fn test_da_job_process_job_failure_on_small_blob_size( then.status(200).body(serde_json::to_vec(&response).unwrap()); }); - let max_blob_per_txn = config.da_client().max_blob_per_txn().await; + let max_blob_per_txn = services.config.da_client().max_blob_per_txn().await; let response = DaJob .process_job( - config.as_ref(), + services.config, &mut JobItem { id: Uuid::default(), internal_id: internal_id.to_string(), @@ -81,7 +88,7 @@ async fn test_da_job_process_job_failure_on_small_blob_size( ); state_update_mock.assert(); - let _ = drop_database().await; + // let _ = drop_database().await; } /// Tests DA Job processing failure when a block is in pending state. @@ -91,8 +98,12 @@ async fn test_da_job_process_job_failure_on_small_blob_size( #[rstest] #[tokio::test] async fn test_da_job_process_job_failure_on_pending_block() { - let server = TestConfigBuilder::new().build().await; - let config = config().await; + let services = TestConfigBuilder::new() + .configure_starknet_client(ConfigType::Actual) + .configure_da_client(ConfigType::Actual) + .build() + .await; + let server = services.server.unwrap(); let internal_id = "1"; let pending_state_update = MaybePendingStateUpdate::PendingUpdate(PendingStateUpdate { @@ -117,7 +128,7 @@ async fn test_da_job_process_job_failure_on_pending_block() { let response = DaJob .process_job( - config.as_ref(), + services.config, &mut JobItem { id: Uuid::default(), internal_id: internal_id.to_string(), @@ -176,8 +187,12 @@ async fn test_da_job_process_job_success( da_client.expect_max_blob_per_txn().with().returning(|| 6); da_client.expect_max_bytes_per_blob().with().returning(|| 131072); - let server = TestConfigBuilder::new().mock_da_client(Box::new(da_client)).build().await; - let config = config().await; + let services = TestConfigBuilder::new() + .configure_storage_client(ConfigType::Actual) + .configure_da_client(da_client.into()) + .build() + .await; + let server = services.server.unwrap(); let state_update = read_state_update_from_file(state_update_file.as_str()).expect("issue while reading"); @@ -193,7 +208,7 @@ async fn test_da_job_process_job_success( let response = DaJob .process_job( - config.as_ref(), + services.config, &mut JobItem { id: Uuid::default(), internal_id: internal_id.to_string(), diff --git a/crates/orchestrator/src/tests/jobs/mod.rs b/crates/orchestrator/src/tests/jobs/mod.rs index d1748feb..78646599 100644 --- a/crates/orchestrator/src/tests/jobs/mod.rs +++ b/crates/orchestrator/src/tests/jobs/mod.rs @@ -1,38 +1,36 @@ -use rstest::rstest; - -use crate::config::config; -use crate::jobs::handle_job_failure; -use crate::jobs::types::JobType; -use crate::{jobs::types::JobStatus, tests::config::TestConfigBuilder}; - -use super::database::build_job_item; - -#[cfg(test)] -pub mod da_job; - -#[cfg(test)] -pub mod proving_job; - -#[cfg(test)] -pub mod state_update_job; - -use assert_matches::assert_matches; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; +use assert_matches::assert_matches; use mockall::predicate::eq; use mongodb::bson::doc; use omniqueue::QueueError; +use rstest::rstest; use tokio::time::sleep; use uuid::Uuid; use crate::jobs::constants::{JOB_PROCESS_ATTEMPT_METADATA_KEY, JOB_VERIFICATION_ATTEMPT_METADATA_KEY}; +use crate::jobs::handle_job_failure; use crate::jobs::job_handler_factory::mock_factory; +use crate::jobs::types::JobType; use crate::jobs::types::{ExternalId, JobItem, JobVerificationStatus}; use crate::jobs::{create_job, increment_key_in_metadata, process_job, verify_job, Job, MockJob}; use crate::queue::job_queue::{JOB_PROCESSING_QUEUE, JOB_VERIFICATION_QUEUE}; use crate::tests::common::MessagePayloadType; +use crate::tests::config::ConfigType; +use crate::{jobs::types::JobStatus, tests::config::TestConfigBuilder}; + +use super::database::build_job_item; + +#[cfg(test)] +pub mod da_job; + +#[cfg(test)] +pub mod proving_job; + +#[cfg(test)] +pub mod state_update_job; /// Tests `create_job` function when job is not existing in the db. #[rstest] @@ -45,22 +43,25 @@ async fn create_job_job_does_not_exists_in_db_works() { let job_item_clone = job_item.clone(); job_handler.expect_create_job().times(1).returning(move |_, _, _| Ok(job_item_clone.clone())); - TestConfigBuilder::new().build().await; - let config = config().await; + let services = TestConfigBuilder::new() + .configure_database(ConfigType::Actual) + .configure_queue_client(ConfigType::Actual) + .build() + .await; // Mocking the `get_job_handler` call in create_job function. let job_handler: Arc> = Arc::new(Box::new(job_handler)); let ctx = mock_factory::get_job_handler_context(); ctx.expect().times(1).with(eq(JobType::SnosRun)).return_once(move |_| Arc::clone(&job_handler)); - assert!(create_job(JobType::SnosRun, "0".to_string(), HashMap::new()).await.is_ok()); + assert!(create_job(JobType::SnosRun, "0".to_string(), HashMap::new(), services.config.clone()).await.is_ok()); let mut hashmap: HashMap = HashMap::new(); hashmap.insert(JOB_PROCESS_ATTEMPT_METADATA_KEY.to_string(), "0".to_string()); hashmap.insert(JOB_VERIFICATION_ATTEMPT_METADATA_KEY.to_string(), "0".to_string()); // Db checks. - let job_in_db = config.database().get_job_by_id(job_item.id).await.unwrap().unwrap(); + let job_in_db = services.config.database().get_job_by_id(job_item.id).await.unwrap().unwrap(); assert_eq!(job_in_db.id, job_item.id); assert_eq!(job_in_db.internal_id, job_item.internal_id); assert_eq!(job_in_db.metadata, hashmap); @@ -69,7 +70,8 @@ async fn create_job_job_does_not_exists_in_db_works() { sleep(Duration::from_secs(5)).await; // Queue checks. - let consumed_messages = config.queue().consume_message_from_queue(JOB_PROCESSING_QUEUE.to_string()).await.unwrap(); + let consumed_messages = + services.config.queue().consume_message_from_queue(JOB_PROCESSING_QUEUE.to_string()).await.unwrap(); let consumed_message_payload: MessagePayloadType = consumed_messages.payload_serde_json().unwrap().unwrap(); assert_eq!(consumed_message_payload.id, job_item.id); } @@ -80,20 +82,25 @@ async fn create_job_job_does_not_exists_in_db_works() { async fn create_job_job_exists_in_db_works() { let job_item = build_job_item_by_type_and_status(JobType::ProofCreation, JobStatus::Created, "0".to_string()); - TestConfigBuilder::new().build().await; + let services = TestConfigBuilder::new() + .configure_database(ConfigType::Actual) + .configure_queue_client(ConfigType::Actual) + .build() + .await; - let config = config().await; - let database_client = config.database(); + let database_client = services.config.database(); database_client.create_job(job_item).await.unwrap(); - assert!(create_job(JobType::ProofCreation, "0".to_string(), HashMap::new()).await.is_err()); + assert!(create_job(JobType::ProofCreation, "0".to_string(), HashMap::new(), services.config.clone()) + .await + .is_err()); // Waiting for 5 secs for message to be passed into the queue sleep(Duration::from_secs(5)).await; // Queue checks. let consumed_messages = - config.queue().consume_message_from_queue(JOB_PROCESSING_QUEUE.to_string()).await.unwrap_err(); + services.config.queue().consume_message_from_queue(JOB_PROCESSING_QUEUE.to_string()).await.unwrap_err(); assert_matches!(consumed_messages, QueueError::NoData); } @@ -103,21 +110,26 @@ async fn create_job_job_exists_in_db_works() { #[should_panic(expected = "Job type not implemented yet.")] #[tokio::test] async fn create_job_job_handler_is_not_implemented_panics() { - TestConfigBuilder::new().build().await; - let config = config().await; + let services = TestConfigBuilder::new() + .configure_database(ConfigType::Actual) + .configure_queue_client(ConfigType::Actual) + .build() + .await; // Mocking the `get_job_handler` call in create_job function. let ctx = mock_factory::get_job_handler_context(); ctx.expect().times(1).returning(|_| panic!("Job type not implemented yet.")); - assert!(create_job(JobType::ProofCreation, "0".to_string(), HashMap::new()).await.is_err()); + assert!(create_job(JobType::ProofCreation, "0".to_string(), HashMap::new(), services.config.clone()) + .await + .is_err()); // Waiting for 5 secs for message to be passed into the queue sleep(Duration::from_secs(5)).await; // Queue checks. let consumed_messages = - config.queue().consume_message_from_queue(JOB_PROCESSING_QUEUE.to_string()).await.unwrap_err(); + services.config.queue().consume_message_from_queue(JOB_PROCESSING_QUEUE.to_string()).await.unwrap_err(); assert_matches!(consumed_messages, QueueError::NoData); } @@ -134,9 +146,12 @@ async fn process_job_with_job_exists_in_db_and_valid_job_processing_status_works let job_item = build_job_item_by_type_and_status(job_type.clone(), job_status.clone(), "1".to_string()); // Building config - TestConfigBuilder::new().build().await; - let config = config().await; - let database_client = config.database(); + let services = TestConfigBuilder::new() + .configure_database(ConfigType::Actual) + .configure_queue_client(ConfigType::Actual) + .build() + .await; + let database_client = services.config.database(); let mut job_handler = MockJob::new(); @@ -151,7 +166,7 @@ async fn process_job_with_job_exists_in_db_and_valid_job_processing_status_works let ctx = mock_factory::get_job_handler_context(); ctx.expect().times(1).with(eq(job_type.clone())).returning(move |_| Arc::clone(&job_handler)); - assert!(process_job(job_item.id).await.is_ok()); + assert!(process_job(job_item.id, services.config.clone()).await.is_ok()); // Getting the updated job. let updated_job = database_client.get_job_by_id(job_item.id).await.unwrap().unwrap(); // checking if job_status is updated in db @@ -164,7 +179,7 @@ async fn process_job_with_job_exists_in_db_and_valid_job_processing_status_works // Queue checks let consumed_messages = - config.queue().consume_message_from_queue(JOB_VERIFICATION_QUEUE.to_string()).await.unwrap(); + services.config.queue().consume_message_from_queue(JOB_VERIFICATION_QUEUE.to_string()).await.unwrap(); let consumed_message_payload: MessagePayloadType = consumed_messages.payload_serde_json().unwrap().unwrap(); assert_eq!(consumed_message_payload.id, job_item.id); } @@ -178,14 +193,17 @@ async fn process_job_with_job_exists_in_db_with_invalid_job_processing_status_er let job_item = build_job_item_by_type_and_status(JobType::SnosRun, JobStatus::Completed, "1".to_string()); // building config - TestConfigBuilder::new().build().await; - let config = config().await; - let database_client = config.database(); + let services = TestConfigBuilder::new() + .configure_database(ConfigType::Actual) + .configure_queue_client(ConfigType::Actual) + .build() + .await; + let database_client = services.config.database(); // creating job in database database_client.create_job(job_item.clone()).await.unwrap(); - assert!(process_job(job_item.id).await.is_err()); + assert!(process_job(job_item.id, services.config.clone()).await.is_err()); let job_in_db = database_client.get_job_by_id(job_item.id).await.unwrap().unwrap(); // Job should be untouched in db. @@ -196,7 +214,7 @@ async fn process_job_with_job_exists_in_db_with_invalid_job_processing_status_er // Queue checks. let consumed_messages = - config.queue().consume_message_from_queue(JOB_VERIFICATION_QUEUE.to_string()).await.unwrap_err(); + services.config.queue().consume_message_from_queue(JOB_VERIFICATION_QUEUE.to_string()).await.unwrap_err(); assert_matches!(consumed_messages, QueueError::NoData); } @@ -209,17 +227,20 @@ async fn process_job_job_does_not_exists_in_db_works() { let job_item = build_job_item_by_type_and_status(JobType::SnosRun, JobStatus::Created, "1".to_string()); // building config - TestConfigBuilder::new().build().await; - let config = config().await; + let services = TestConfigBuilder::new() + .configure_database(ConfigType::Actual) + .configure_queue_client(ConfigType::Actual) + .build() + .await; - assert!(process_job(job_item.id).await.is_err()); + assert!(process_job(job_item.id, services.config.clone()).await.is_err()); // Waiting for 5 secs for message to be passed into the queue sleep(Duration::from_secs(5)).await; // Queue checks. let consumed_messages = - config.queue().consume_message_from_queue(JOB_VERIFICATION_QUEUE.to_string()).await.unwrap_err(); + services.config.queue().consume_message_from_queue(JOB_VERIFICATION_QUEUE.to_string()).await.unwrap_err(); assert_matches!(consumed_messages, QueueError::NoData); } @@ -241,18 +262,24 @@ async fn process_job_two_workers_process_same_job_works() { ctx.expect().times(1).with(eq(JobType::SnosRun)).returning(move |_| Arc::clone(&job_handler)); // building config - TestConfigBuilder::new().build().await; - let config = config().await; - let db_client = config.database(); + let services = TestConfigBuilder::new() + .configure_database(ConfigType::Actual) + .configure_queue_client(ConfigType::Actual) + .build() + .await; + let db_client = services.config.database(); let job_item = build_job_item_by_type_and_status(JobType::SnosRun, JobStatus::Created, "1".to_string()); // Creating the job in the db db_client.create_job(job_item.clone()).await.unwrap(); - // Simulating the two workers - let worker_1 = tokio::spawn(async move { process_job(job_item.id).await }); - let worker_2 = tokio::spawn(async move { process_job(job_item.id).await }); + let config_1 = services.config.clone(); + let config_2 = services.config.clone(); + + // Simulating the two workers, Uuid has in-built copy trait + let worker_1 = tokio::spawn(async move { process_job(job_item.id, config_1).await }); + let worker_2 = tokio::spawn(async move { process_job(job_item.id, config_2).await }); // waiting for workers to complete the processing let (result_1, result_2) = tokio::join!(worker_1, worker_2); @@ -279,10 +306,13 @@ async fn verify_job_with_verified_status_works() { build_job_item_by_type_and_status(JobType::DataSubmission, JobStatus::PendingVerification, "1".to_string()); // building config - TestConfigBuilder::new().build().await; + let services = TestConfigBuilder::new() + .configure_database(ConfigType::Actual) + .configure_queue_client(ConfigType::Actual) + .build() + .await; - let config = config().await; - let database_client = config.database(); + let database_client = services.config.database(); let mut job_handler = MockJob::new(); // creating job in database @@ -296,7 +326,7 @@ async fn verify_job_with_verified_status_works() { // Mocking the `get_job_handler` call in create_job function. ctx.expect().times(1).with(eq(JobType::DataSubmission)).returning(move |_| Arc::clone(&job_handler)); - assert!(verify_job(job_item.id).await.is_ok()); + assert!(verify_job(job_item.id, services.config.clone()).await.is_ok()); // DB checks. let updated_job = database_client.get_job_by_id(job_item.id).await.unwrap().unwrap(); @@ -307,10 +337,10 @@ async fn verify_job_with_verified_status_works() { // Queue checks. let consumed_messages_verification_queue = - config.queue().consume_message_from_queue(JOB_VERIFICATION_QUEUE.to_string()).await.unwrap_err(); + services.config.queue().consume_message_from_queue(JOB_VERIFICATION_QUEUE.to_string()).await.unwrap_err(); assert_matches!(consumed_messages_verification_queue, QueueError::NoData); let consumed_messages_processing_queue = - config.queue().consume_message_from_queue(JOB_PROCESSING_QUEUE.to_string()).await.unwrap_err(); + services.config.queue().consume_message_from_queue(JOB_PROCESSING_QUEUE.to_string()).await.unwrap_err(); assert_matches!(consumed_messages_processing_queue, QueueError::NoData); } @@ -323,10 +353,13 @@ async fn verify_job_with_rejected_status_adds_to_queue_works() { build_job_item_by_type_and_status(JobType::DataSubmission, JobStatus::PendingVerification, "1".to_string()); // building config - TestConfigBuilder::new().build().await; + let services = TestConfigBuilder::new() + .configure_database(ConfigType::Actual) + .configure_queue_client(ConfigType::Actual) + .build() + .await; - let config = config().await; - let database_client = config.database(); + let database_client = services.config.database(); let mut job_handler = MockJob::new(); // creating job in database @@ -339,7 +372,7 @@ async fn verify_job_with_rejected_status_adds_to_queue_works() { // Mocking the `get_job_handler` call in create_job function. ctx.expect().times(1).with(eq(JobType::DataSubmission)).returning(move |_| Arc::clone(&job_handler)); - assert!(verify_job(job_item.id).await.is_ok()); + assert!(verify_job(job_item.id, services.config.clone()).await.is_ok()); // DB checks. let updated_job = database_client.get_job_by_id(job_item.id).await.unwrap().unwrap(); @@ -349,7 +382,8 @@ async fn verify_job_with_rejected_status_adds_to_queue_works() { sleep(Duration::from_secs(5)).await; // Queue checks. - let consumed_messages = config.queue().consume_message_from_queue(JOB_PROCESSING_QUEUE.to_string()).await.unwrap(); + let consumed_messages = + services.config.queue().consume_message_from_queue(JOB_PROCESSING_QUEUE.to_string()).await.unwrap(); let consumed_message_payload: MessagePayloadType = consumed_messages.payload_serde_json().unwrap().unwrap(); assert_eq!(consumed_message_payload.id, job_item.id); } @@ -368,10 +402,13 @@ async fn verify_job_with_rejected_status_works() { job_item.metadata = metadata; // building config - TestConfigBuilder::new().build().await; + let services = TestConfigBuilder::new() + .configure_database(ConfigType::Actual) + .configure_queue_client(ConfigType::Actual) + .build() + .await; - let config = config().await; - let database_client = config.database(); + let database_client = services.config.database(); let mut job_handler = MockJob::new(); // creating job in database @@ -385,7 +422,7 @@ async fn verify_job_with_rejected_status_works() { // Mocking the `get_job_handler` call in create_job function. ctx.expect().times(1).with(eq(JobType::DataSubmission)).returning(move |_| Arc::clone(&job_handler)); - assert!(verify_job(job_item.id).await.is_ok()); + assert!(verify_job(job_item.id, services.config.clone()).await.is_ok()); // DB checks. let updated_job = database_client.get_job_by_id(job_item.id).await.unwrap().unwrap(); @@ -397,7 +434,7 @@ async fn verify_job_with_rejected_status_works() { // Queue checks. let consumed_messages_processing_queue = - config.queue().consume_message_from_queue(JOB_PROCESSING_QUEUE.to_string()).await.unwrap_err(); + services.config.queue().consume_message_from_queue(JOB_PROCESSING_QUEUE.to_string()).await.unwrap_err(); assert_matches!(consumed_messages_processing_queue, QueueError::NoData); } @@ -410,10 +447,13 @@ async fn verify_job_with_pending_status_adds_to_queue_works() { build_job_item_by_type_and_status(JobType::DataSubmission, JobStatus::PendingVerification, "1".to_string()); // building config - TestConfigBuilder::new().build().await; + let services = TestConfigBuilder::new() + .configure_database(ConfigType::Actual) + .configure_queue_client(ConfigType::Actual) + .build() + .await; - let config = config().await; - let database_client = config.database(); + let database_client = services.config.database(); let mut job_handler = MockJob::new(); // creating job in database @@ -428,7 +468,7 @@ async fn verify_job_with_pending_status_adds_to_queue_works() { // Mocking the `get_job_handler` call in create_job function. ctx.expect().times(1).with(eq(JobType::DataSubmission)).returning(move |_| Arc::clone(&job_handler)); - assert!(verify_job(job_item.id).await.is_ok()); + assert!(verify_job(job_item.id, services.config.clone()).await.is_ok()); // DB checks. let updated_job = database_client.get_job_by_id(job_item.id).await.unwrap().unwrap(); @@ -440,7 +480,7 @@ async fn verify_job_with_pending_status_adds_to_queue_works() { // Queue checks let consumed_messages = - config.queue().consume_message_from_queue(JOB_VERIFICATION_QUEUE.to_string()).await.unwrap(); + services.config.queue().consume_message_from_queue(JOB_VERIFICATION_QUEUE.to_string()).await.unwrap(); let consumed_message_payload: MessagePayloadType = consumed_messages.payload_serde_json().unwrap().unwrap(); assert_eq!(consumed_message_payload.id, job_item.id); } @@ -459,10 +499,13 @@ async fn verify_job_with_pending_status_works() { job_item.metadata = metadata; // building config - TestConfigBuilder::new().build().await; + let services = TestConfigBuilder::new() + .configure_database(ConfigType::Actual) + .configure_queue_client(ConfigType::Actual) + .build() + .await; - let config = config().await; - let database_client = config.database(); + let database_client = services.config.database(); let mut job_handler = MockJob::new(); // creating job in database @@ -477,7 +520,7 @@ async fn verify_job_with_pending_status_works() { // Mocking the `get_job_handler` call in create_job function. ctx.expect().times(1).with(eq(JobType::DataSubmission)).returning(move |_| Arc::clone(&job_handler)); - assert!(verify_job(job_item.id).await.is_ok()); + assert!(verify_job(job_item.id, services.config.clone()).await.is_ok()); // DB checks. let updated_job = database_client.get_job_by_id(job_item.id).await.unwrap().unwrap(); @@ -489,7 +532,7 @@ async fn verify_job_with_pending_status_works() { // Queue checks. let consumed_messages_verification_queue = - config.queue().consume_message_from_queue(JOB_VERIFICATION_QUEUE.to_string()).await.unwrap_err(); + services.config.queue().consume_message_from_queue(JOB_VERIFICATION_QUEUE.to_string()).await.unwrap_err(); assert_matches!(consumed_messages_verification_queue, QueueError::NoData); } @@ -518,9 +561,13 @@ fn build_job_item_by_type_and_status(job_type: JobType, job_status: JobStatus, i #[case(JobType::DataSubmission, JobStatus::VerificationFailed)] #[tokio::test] async fn handle_job_failure_with_failed_job_status_works(#[case] job_type: JobType, #[case] job_status: JobStatus) { - TestConfigBuilder::new().build().await; - let config = config().await; - let database_client = config.database(); + let services = TestConfigBuilder::new() + .configure_database(ConfigType::Actual) + .configure_queue_client(ConfigType::Actual) + .build() + .await; + + let database_client = services.config.database(); let internal_id = 1; // create a job, with already available "last_job_status" @@ -535,9 +582,10 @@ async fn handle_job_failure_with_failed_job_status_works(#[case] job_type: JobTy database_client.create_job(job_expected.clone()).await.unwrap(); // calling handle_job_failure - handle_job_failure(job_id).await.expect("handle_job_failure failed to run"); + handle_job_failure(job_id, services.config.clone()).await.expect("handle_job_failure failed to run"); - let job_fetched = config.database().get_job_by_id(job_id).await.expect("Unable to fetch Job Data").unwrap(); + let job_fetched = + services.config.database().get_job_by_id(job_id).await.expect("Unable to fetch Job Data").unwrap(); assert_eq!(job_fetched, job_expected); } @@ -547,9 +595,13 @@ async fn handle_job_failure_with_failed_job_status_works(#[case] job_type: JobTy #[case::verification_timeout(JobType::SnosRun, JobStatus::VerificationTimeout)] #[tokio::test] async fn handle_job_failure_with_correct_job_status_works(#[case] job_type: JobType, #[case] job_status: JobStatus) { - TestConfigBuilder::new().build().await; - let config = config().await; - let database_client = config.database(); + let services = TestConfigBuilder::new() + .configure_database(ConfigType::Actual) + .configure_queue_client(ConfigType::Actual) + .build() + .await; + + let database_client = services.config.database(); let internal_id = 1; // create a job @@ -560,9 +612,10 @@ async fn handle_job_failure_with_correct_job_status_works(#[case] job_type: JobT database_client.create_job(job.clone()).await.unwrap(); // calling handle_job_failure - handle_job_failure(job_id).await.expect("handle_job_failure failed to run"); + handle_job_failure(job_id, services.config.clone()).await.expect("handle_job_failure failed to run"); - let job_fetched = config.database().get_job_by_id(job_id).await.expect("Unable to fetch Job Data").unwrap(); + let job_fetched = + services.config.database().get_job_by_id(job_id).await.expect("Unable to fetch Job Data").unwrap(); // creating expected output let mut job_expected = job.clone(); @@ -581,9 +634,13 @@ async fn handle_job_failure_with_correct_job_status_works(#[case] job_type: JobT async fn handle_job_failure_job_status_completed_works(#[case] job_type: JobType) { let job_status = JobStatus::Completed; - TestConfigBuilder::new().build().await; - let config = config().await; - let database_client = config.database(); + let services = TestConfigBuilder::new() + .configure_database(ConfigType::Actual) + .configure_queue_client(ConfigType::Actual) + .build() + .await; + + let database_client = services.config.database(); let internal_id = 1; // create a job @@ -594,10 +651,13 @@ async fn handle_job_failure_job_status_completed_works(#[case] job_type: JobType database_client.create_job(job_expected.clone()).await.unwrap(); // calling handle_job_failure - handle_job_failure(job_id).await.expect("Test call to handle_job_failure should have passed."); + handle_job_failure(job_id, services.config.clone()) + .await + .expect("Test call to handle_job_failure should have passed."); // The completed job status on db is untouched. - let job_fetched = config.database().get_job_by_id(job_id).await.expect("Unable to fetch Job Data").unwrap(); + let job_fetched = + services.config.database().get_job_by_id(job_id).await.expect("Unable to fetch Job Data").unwrap(); assert_eq!(job_fetched, job_expected); } diff --git a/crates/orchestrator/src/tests/jobs/proving_job/mod.rs b/crates/orchestrator/src/tests/jobs/proving_job/mod.rs index d35b2670..335a9e9b 100644 --- a/crates/orchestrator/src/tests/jobs/proving_job/mod.rs +++ b/crates/orchestrator/src/tests/jobs/proving_job/mod.rs @@ -3,9 +3,7 @@ use std::collections::HashMap; use std::fs::File; use std::io::Read; use std::path::Path; -use std::sync::Arc; -use crate::config::config; use crate::data_storage::MockDataStorage; use httpmock::prelude::*; use mockall::predicate::eq; @@ -25,10 +23,9 @@ use crate::tests::config::TestConfigBuilder; #[rstest] #[tokio::test] async fn test_create_job() { - TestConfigBuilder::new().build().await; - let config = config().await; + let services = TestConfigBuilder::new().build().await; - let job = ProvingJob.create_job(&config, String::from("0"), HashMap::new()).await; + let job = ProvingJob.create_job(services.config.clone(), String::from("0"), HashMap::new()).await; assert!(job.is_ok()); let job = job.unwrap(); @@ -47,10 +44,9 @@ async fn test_verify_job(#[from(default_job_item)] mut job_item: JobItem) { let mut prover_client = MockProverClient::new(); prover_client.expect_get_task_status().times(1).returning(|_| Ok(TaskStatus::Succeeded)); - TestConfigBuilder::new().mock_prover_client(Box::new(prover_client)).build().await; + let services = TestConfigBuilder::new().configure_prover_client(prover_client.into()).build().await; - let config = config().await; - assert!(ProvingJob.verify_job(&config, &mut job_item).await.is_ok()); + assert!(ProvingJob.verify_job(services.config, &mut job_item).await.is_ok()); } #[rstest] @@ -73,17 +69,17 @@ async fn test_process_job() { let buffer_bytes = Bytes::from(buffer); storage.expect_get_data().with(eq("0/pie.zip")).return_once(move |_| Ok(buffer_bytes)); - TestConfigBuilder::new() - .mock_starknet_client(Arc::new(provider)) - .mock_prover_client(Box::new(prover_client)) - .mock_storage_client(Box::new(storage)) + let services = TestConfigBuilder::new() + .configure_starknet_client(provider.into()) + .configure_prover_client(prover_client.into()) + .configure_storage_client(storage.into()) .build() .await; assert_eq!( ProvingJob .process_job( - config().await.as_ref(), + services.config, &mut JobItem { id: Uuid::default(), internal_id: "0".into(), diff --git a/crates/orchestrator/src/tests/jobs/state_update_job/mod.rs b/crates/orchestrator/src/tests/jobs/state_update_job/mod.rs index dfd40a24..fbb7604b 100644 --- a/crates/orchestrator/src/tests/jobs/state_update_job/mod.rs +++ b/crates/orchestrator/src/tests/jobs/state_update_job/mod.rs @@ -1,19 +1,20 @@ use std::collections::HashMap; use std::fs; use std::path::PathBuf; -use std::sync::Arc; use assert_matches::assert_matches; use bytes::Bytes; +use color_eyre::eyre::eyre; use httpmock::prelude::*; +use lazy_static::lazy_static; use mockall::predicate::{always, eq}; use rstest::*; -use settlement_client_interface::MockSettlementClient; +use starknet::providers::jsonrpc::HttpTransport; +use starknet::providers::JsonRpcClient; +use url::Url; -use color_eyre::eyre::eyre; -use utils::env_utils::get_env_var_or_panic; +use settlement_client_interface::MockSettlementClient; -use crate::config::config; use crate::constants::{BLOB_DATA_FILE_NAME, SNOS_OUTPUT_FILE_NAME}; use crate::data_storage::MockDataStorage; use crate::jobs::constants::JOB_METADATA_STATE_UPDATE_LAST_FAILED_BLOCK_NO; @@ -25,12 +26,8 @@ use crate::jobs::state_update_job::utils::hex_string_to_u8_vec; use crate::jobs::state_update_job::{StateUpdateError, StateUpdateJob}; use crate::jobs::types::{JobStatus, JobType}; use crate::jobs::{Job, JobError}; -use crate::tests::common::{default_job_item, get_storage_client}; +use crate::tests::common::default_job_item; use crate::tests::config::TestConfigBuilder; -use lazy_static::lazy_static; -use starknet::providers::jsonrpc::HttpTransport; -use starknet::providers::JsonRpcClient; -use url::Url; lazy_static! { pub static ref CURRENT_PATH: PathBuf = std::env::current_dir().unwrap(); @@ -43,12 +40,12 @@ pub const X_0_FILE_NAME: &str = "x_0.txt"; #[rstest] #[tokio::test] async fn test_process_job_attempt_not_present_fails() { - TestConfigBuilder::new().build().await; + let services = TestConfigBuilder::new().build().await; let mut job = default_job_item(); - let config = config().await; + let state_update_job = StateUpdateJob {}; - let res = state_update_job.process_job(&config, &mut job).await.unwrap_err(); + let res = state_update_job.process_job(services.config, &mut job).await.unwrap_err(); assert_eq!(res, JobError::StateUpdateJobError(StateUpdateError::AttemptNumberNotFound)); } @@ -63,9 +60,8 @@ async fn test_process_job_works( ) { // Will be used by storage client which we call while storing the data. - use num::ToPrimitive; + use crate::tests::config::ConfigType; - use crate::jobs::state_update_job::utils::fetch_blob_data_for_block; dotenvy::from_filename("../.env.test").expect("Failed to load the .env file"); // Mocking the settlement client. @@ -76,21 +72,17 @@ async fn test_process_job_works( // This must be the last block number and should be returned as an output from the process job. let last_block_number = block_numbers[block_numbers.len() - 1]; - // Storing `blob_data` and `snos_output` in storage client - store_data_in_storage_client_for_s3(block_numbers.clone()).await; - - // Building a temp config that will be used by `fetch_blob_data_for_block` and `fetch_snos_for_block` - // functions while fetching the blob data from storage client. - TestConfigBuilder::new().build().await; - - // test_process_job_works uses nonce just to write expect_update_state_with_blobs for a mocked settlement client, - // which means that nonce ideally is never checked against, hence supplying any `u64` `nonce` works. - let nonce: u64 = 3; - settlement_client.expect_get_nonce().with().returning(move || Ok(nonce)); - // Adding expectations for each block number to be called by settlement client. for block in block_numbers.iter().skip(processing_start_index as usize) { - let blob_data = fetch_blob_data_for_block(block.to_u64().unwrap()).await.unwrap(); + // Getting the blob data from file. + let blob_data = fs::read_to_string( + CURRENT_PATH.join(format!("src/tests/jobs/state_update_job/test_data/{}/{}", block, BLOB_DATA_FILE_NAME)), + ) + .unwrap(); + + let blob_data_vec = vec![hex_string_to_u8_vec(&blob_data).unwrap()]; + + let blob_data = blob_data_vec; settlement_client .expect_update_state_with_blobs() .with(eq(vec![]), eq(blob_data), always()) @@ -98,9 +90,39 @@ async fn test_process_job_works( .returning(|_, _, _| Ok("0xbeef".to_string())); } settlement_client.expect_get_last_settled_block().with().returning(move || Ok(651052)); + // Setting random nonce + settlement_client.expect_get_nonce().with().returning(move || Ok(2)); - // Building new config with mocked settlement client - TestConfigBuilder::new().mock_settlement_client(Box::new(settlement_client)).build().await; + // Building a temp config that will be used by `fetch_blob_data_for_block` and `fetch_snos_for_block` + // functions while fetching the blob data from storage client. + let services = TestConfigBuilder::new() + .configure_storage_client(ConfigType::Actual) + .configure_settlement_client(settlement_client.into()) + .build() + .await; + + let storage_client = services.config.storage(); + + for block in block_numbers { + // Getting the blob data from file. + let blob_data_key = block.to_owned().to_string() + "/" + BLOB_DATA_FILE_NAME; + let blob_data = fs::read_to_string( + CURRENT_PATH.join(format!("src/tests/jobs/state_update_job/test_data/{}/{}", block, BLOB_DATA_FILE_NAME)), + ) + .unwrap(); + let blob_data_vec = vec![hex_string_to_u8_vec(&blob_data).unwrap()]; + let blob_serialized = bincode::serialize(&blob_data_vec).unwrap(); + + // Getting the snos data from file. + let snos_output_key = block.to_owned().to_string() + "/" + SNOS_OUTPUT_FILE_NAME; + let snos_output_data = fs::read_to_string( + CURRENT_PATH.join(format!("src/tests/jobs/state_update_job/test_data/{}/{}", block, SNOS_OUTPUT_FILE_NAME)), + ) + .unwrap(); + + storage_client.put_data(Bytes::from(snos_output_data), &snos_output_key).await.unwrap(); + storage_client.put_data(Bytes::from(blob_serialized), &blob_data_key).await.unwrap(); + } // setting last failed block number as 651053. // setting blocks yet to process as 651054 and 651055. @@ -117,9 +139,8 @@ async fn test_process_job_works( job.job_type = JobType::StateTransition; job.metadata = metadata; - let config = config().await; let state_update_job = StateUpdateJob {}; - let res = state_update_job.process_job(&config, &mut job).await.unwrap(); + let res = state_update_job.process_job(services.config, &mut job).await.unwrap(); assert_eq!(res, last_block_number.to_string()); } @@ -128,18 +149,16 @@ async fn test_process_job_works( #[rstest] #[tokio::test] async fn create_job_works() { - TestConfigBuilder::new().build().await; + let services = TestConfigBuilder::new().build().await; - let config = config().await; - - let job = StateUpdateJob.create_job(&config, String::from("0"), HashMap::default()).await; + let job = StateUpdateJob.create_job(services.config, String::from("0"), HashMap::default()).await; assert!(job.is_ok()); let job = job.unwrap(); let job_type = job.job_type; assert_eq!(job_type, JobType::StateTransition, "job_type should be StateTransition"); - assert!(!(job.id.is_nil()), "id should not be nil"); + assert!(!job.id.is_nil(), "id should not be nil"); assert_eq!(job.status, JobStatus::Created, "status should be Created"); assert_eq!(job.version, 0_i32, "version should be 0"); assert_eq!(job.external_id.unwrap_string().unwrap(), String::new(), "external_id should be empty string"); @@ -201,9 +220,9 @@ async fn process_job_works() { .returning(|_, _, _| Ok(String::from("0x5d17fac98d9454030426606019364f6e68d915b91f6210ef1e2628cd6987442"))); } - TestConfigBuilder::new() - .mock_settlement_client(Box::new(settlement_client)) - .mock_storage_client(Box::new(storage_client)) + let services = TestConfigBuilder::new() + .configure_settlement_client(settlement_client.into()) + .configure_storage_client(storage_client.into()) .build() .await; @@ -213,8 +232,8 @@ async fn process_job_works() { metadata.insert(String::from(JOB_PROCESS_ATTEMPT_METADATA_KEY), String::from("0")); let mut job = - StateUpdateJob.create_job(config().await.as_ref(), String::from("internal_id"), metadata).await.unwrap(); - assert_eq!(StateUpdateJob.process_job(config().await.as_ref(), &mut job).await.unwrap(), "651056".to_string()) + StateUpdateJob.create_job(services.config.clone(), String::from("internal_id"), metadata).await.unwrap(); + assert_eq!(StateUpdateJob.process_job(services.config, &mut job).await.unwrap(), "651056".to_string()) } #[rstest] @@ -232,19 +251,19 @@ async fn process_job_invalid_inputs_errors(#[case] block_numbers_to_settle: Stri Url::parse(format!("http://localhost:{}", server.port()).as_str()).expect("Failed to parse URL"), )); - TestConfigBuilder::new() - .mock_starknet_client(Arc::new(provider)) - .mock_settlement_client(Box::new(settlement_client)) + let services = TestConfigBuilder::new() + .configure_settlement_client(settlement_client.into()) + .configure_starknet_client(provider.into()) .build() .await; - let config = config().await; let mut metadata: HashMap = HashMap::new(); metadata.insert(String::from(JOB_METADATA_STATE_UPDATE_BLOCKS_TO_SETTLE_KEY), block_numbers_to_settle); metadata.insert(String::from(JOB_PROCESS_ATTEMPT_METADATA_KEY), String::from("0")); - let mut job = StateUpdateJob.create_job(&config, String::from("internal_id"), metadata).await.unwrap(); - let status = StateUpdateJob.process_job(&config, &mut job).await; + let mut job = + StateUpdateJob.create_job(services.config.clone(), String::from("internal_id"), metadata).await.unwrap(); + let status = StateUpdateJob.process_job(services.config, &mut job).await; assert!(status.is_err()); if let Err(error) = status { @@ -269,9 +288,9 @@ async fn process_job_invalid_input_gap_panics() { Url::parse(format!("http://localhost:{}", server.port()).as_str()).expect("Failed to parse URL"), )); - TestConfigBuilder::new() - .mock_starknet_client(Arc::new(provider)) - .mock_settlement_client(Box::new(settlement_client)) + let services = TestConfigBuilder::new() + .configure_starknet_client(provider.into()) + .configure_settlement_client(settlement_client.into()) .build() .await; @@ -280,8 +299,8 @@ async fn process_job_invalid_input_gap_panics() { metadata.insert(String::from(JOB_PROCESS_ATTEMPT_METADATA_KEY), String::from("0")); let mut job = - StateUpdateJob.create_job(config().await.as_ref(), String::from("internal_id"), metadata).await.unwrap(); - let response = StateUpdateJob.process_job(config().await.as_ref(), &mut job).await; + StateUpdateJob.create_job(services.config.clone(), String::from("internal_id"), metadata).await.unwrap(); + let response = StateUpdateJob.process_job(services.config, &mut job).await; assert_matches!(response, Err(e) => { @@ -303,32 +322,6 @@ async fn load_state_diff_file(block_no: u64) -> Vec> { state_diff_vec } -async fn store_data_in_storage_client_for_s3(block_numbers: Vec) { - let storage_client = get_storage_client().await; - storage_client.build_test_bucket(&get_env_var_or_panic("AWS_S3_BUCKET_NAME")).await.unwrap(); - - for block in block_numbers { - // Getting the blob data from file. - let blob_data_key = block.to_owned().to_string() + "/" + BLOB_DATA_FILE_NAME; - let blob_data = fs::read_to_string( - CURRENT_PATH.join(format!("src/tests/jobs/state_update_job/test_data/{}/{}", block, BLOB_DATA_FILE_NAME)), - ) - .unwrap(); - let blob_data_vec = vec![hex_string_to_u8_vec(&blob_data).unwrap()]; - let blob_serialized = bincode::serialize(&blob_data_vec).unwrap(); - - // Getting the snos data from file. - let snos_output_key = block.to_owned().to_string() + "/" + SNOS_OUTPUT_FILE_NAME; - let snos_output_data = fs::read_to_string( - CURRENT_PATH.join(format!("src/tests/jobs/state_update_job/test_data/{}/{}", block, SNOS_OUTPUT_FILE_NAME)), - ) - .unwrap(); - - storage_client.put_data(Bytes::from(snos_output_data), &snos_output_key).await.unwrap(); - storage_client.put_data(Bytes::from(blob_serialized), &blob_data_key).await.unwrap(); - } -} - fn parse_block_numbers(blocks_to_settle: &str) -> color_eyre::Result> { let sanitized_blocks = blocks_to_settle.replace(' ', ""); let block_numbers: Vec = sanitized_blocks diff --git a/crates/orchestrator/src/tests/server/mod.rs b/crates/orchestrator/src/tests/server/mod.rs index 33b7f657..670a8933 100644 --- a/crates/orchestrator/src/tests/server/mod.rs +++ b/crates/orchestrator/src/tests/server/mod.rs @@ -1,6 +1,5 @@ use std::io::Read; use std::net::SocketAddr; -use std::sync::Arc; use axum::http::StatusCode; use hyper::body::Buf; @@ -21,7 +20,7 @@ pub async fn setup_server() -> SocketAddr { Url::parse("http://localhost:9944".to_string().as_str()).expect("Failed to parse URL"), )); - TestConfigBuilder::new().mock_starknet_client(Arc::new(provider)).build().await; + TestConfigBuilder::new().configure_starknet_client(provider.into()).build().await; let host = get_env_var_or_default("HOST", "127.0.0.1"); let port = get_env_var_or_default("PORT", "3000").parse::().expect("PORT must be a u16"); @@ -62,5 +61,6 @@ async fn test_health_endpoint(#[future] setup_server: SocketAddr) { #[rstest] #[tokio::test] async fn test_init_consumer() { - assert!(init_consumers().await.is_ok()); + let services = TestConfigBuilder::new().build().await; + assert!(init_consumers(services.config).await.is_ok()); } diff --git a/crates/orchestrator/src/tests/workers/proving/mod.rs b/crates/orchestrator/src/tests/workers/proving/mod.rs index 22f4a36c..05051e54 100644 --- a/crates/orchestrator/src/tests/workers/proving/mod.rs +++ b/crates/orchestrator/src/tests/workers/proving/mod.rs @@ -29,7 +29,7 @@ async fn test_proving_worker(#[case] incomplete_runs: bool) -> Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box> { Url::parse(format!("http://localhost:{}", server.port()).as_str()).expect("Failed to parse URL"), )); - TestConfigBuilder::new() - .mock_starknet_client(Arc::new(provider)) - .mock_db_client(Box::new(db)) - .mock_queue(Box::new(queue)) - .mock_da_client(Box::new(da_client)) + let services = TestConfigBuilder::new() + .configure_starknet_client(provider.into()) + .configure_database(db.into()) + .configure_queue_client(queue.into()) + .configure_da_client(da_client.into()) .build() .await; @@ -110,7 +110,7 @@ async fn test_snos_worker(#[case] db_val: bool) -> Result<(), Box> { }); let snos_worker = SnosWorker {}; - snos_worker.run_worker().await?; + snos_worker.run_worker(services.config).await?; rpc_block_call_mock.assert(); diff --git a/crates/orchestrator/src/tests/workers/update_state/mod.rs b/crates/orchestrator/src/tests/workers/update_state/mod.rs index a15cc62a..58fc68ba 100644 --- a/crates/orchestrator/src/tests/workers/update_state/mod.rs +++ b/crates/orchestrator/src/tests/workers/update_state/mod.rs @@ -70,12 +70,10 @@ async fn test_update_state_worker( // creation) let completed_jobs = get_job_by_mock_id_vector(JobType::ProofCreation, JobStatus::Completed, number_of_processed_jobs as u64, 2); - for job in completed_jobs { - db.expect_get_job_by_internal_id_and_type() - .times(1) - .with(eq(job.internal_id.to_string()), eq(JobType::StateTransition)) - .returning(|_, _| Ok(None)); - } + db.expect_get_job_by_internal_id_and_type() + .times(1) + .with(eq(completed_jobs[0].internal_id.to_string()), eq(JobType::StateTransition)) + .returning(|_, _| Ok(None)); // mocking the creation of jobs let job_item = get_job_item_mock_by_id("1".to_string(), Uuid::new_v4()); @@ -107,16 +105,16 @@ async fn test_update_state_worker( )); // mock block number (madara) : 5 - TestConfigBuilder::new() - .mock_starknet_client(Arc::new(provider)) - .mock_db_client(Box::new(db)) - .mock_queue(Box::new(queue)) - .mock_da_client(Box::new(da_client)) + let services = TestConfigBuilder::new() + .configure_starknet_client(provider.into()) + .configure_database(db.into()) + .configure_queue_client(queue.into()) + .configure_da_client(da_client.into()) .build() .await; let update_state_worker = UpdateStateWorker {}; - update_state_worker.run_worker().await?; + update_state_worker.run_worker(services.config).await?; Ok(()) } diff --git a/crates/orchestrator/src/workers/data_submission_worker.rs b/crates/orchestrator/src/workers/data_submission_worker.rs index 3c2d4331..23e43770 100644 --- a/crates/orchestrator/src/workers/data_submission_worker.rs +++ b/crates/orchestrator/src/workers/data_submission_worker.rs @@ -1,10 +1,11 @@ -use crate::config::config; +use crate::config::Config; use crate::jobs::create_job; use crate::jobs::types::{JobStatus, JobType}; use crate::workers::Worker; use async_trait::async_trait; use std::collections::HashMap; use std::error::Error; +use std::sync::Arc; pub struct DataSubmissionWorker; @@ -14,9 +15,7 @@ impl Worker for DataSubmissionWorker { // 1. Fetch the latest completed Proving job. // 2. Fetch the latest DA job creation. // 3. Create jobs from after the lastest DA job already created till latest completed proving job. - async fn run_worker(&self) -> Result<(), Box> { - let config = config().await; - + async fn run_worker(&self, config: Arc) -> Result<(), Box> { // provides latest completed proof creation job id let latest_proven_job_id = config .database() @@ -40,7 +39,7 @@ impl Worker for DataSubmissionWorker { // creating data submission jobs for latest blocks that don't have existing data submission jobs yet. for new_job_id in latest_data_submission_id + 1..latest_proven_id + 1 { - create_job(JobType::DataSubmission, new_job_id.to_string(), HashMap::new()).await?; + create_job(JobType::DataSubmission, new_job_id.to_string(), HashMap::new(), config.clone()).await?; } Ok(()) diff --git a/crates/orchestrator/src/workers/mod.rs b/crates/orchestrator/src/workers/mod.rs index 785b1e2d..7934d8bb 100644 --- a/crates/orchestrator/src/workers/mod.rs +++ b/crates/orchestrator/src/workers/mod.rs @@ -1,6 +1,6 @@ -use crate::{config::config, jobs::types::JobStatus}; +use crate::{config::Config, jobs::types::JobStatus}; use async_trait::async_trait; -use std::error::Error; +use std::{error::Error, sync::Arc}; pub mod data_submission_worker; pub mod proof_registration; @@ -10,14 +10,14 @@ pub mod update_state; #[async_trait] pub trait Worker: Send + Sync { - async fn run_worker_if_enabled(&self) -> Result<(), Box> { - if !self.is_worker_enabled().await? { + async fn run_worker_if_enabled(&self, config: Arc) -> Result<(), Box> { + if !self.is_worker_enabled(config.clone()).await? { return Ok(()); } - self.run_worker().await + self.run_worker(config).await } - async fn run_worker(&self) -> Result<(), Box>; + async fn run_worker(&self, config: Arc) -> Result<(), Box>; // Assumption // If say a job for block X fails, we don't want the worker to respawn another job for the same block @@ -29,9 +29,7 @@ pub trait Worker: Send + Sync { // Checks if any of the jobs have failed // Failure : JobStatus::VerificationFailed, JobStatus::VerificationTimeout, JobStatus::Failed // Halts any new job creation till all the count of failed jobs is not Zero. - async fn is_worker_enabled(&self) -> Result> { - let config = config().await; - + async fn is_worker_enabled(&self, config: Arc) -> Result> { let failed_jobs = config .database() .get_jobs_by_statuses(vec![JobStatus::VerificationFailed, JobStatus::VerificationTimeout], Some(1)) diff --git a/crates/orchestrator/src/workers/proof_registration.rs b/crates/orchestrator/src/workers/proof_registration.rs index ea6e5544..abf6da53 100644 --- a/crates/orchestrator/src/workers/proof_registration.rs +++ b/crates/orchestrator/src/workers/proof_registration.rs @@ -1,8 +1,8 @@ -use std::error::Error; +use std::{error::Error, sync::Arc}; use async_trait::async_trait; -use crate::workers::Worker; +use crate::{config::Config, workers::Worker}; pub struct ProofRegistrationWorker; @@ -11,7 +11,7 @@ impl Worker for ProofRegistrationWorker { /// 1. Fetch all blocks with a successful proving job run /// 2. Group blocks that have the same proof /// 3. For each group, create a proof registration job with from and to block in metadata - async fn run_worker(&self) -> Result<(), Box> { + async fn run_worker(&self, _config: Arc) -> Result<(), Box> { todo!() } } diff --git a/crates/orchestrator/src/workers/proving.rs b/crates/orchestrator/src/workers/proving.rs index 4ec85b91..e263b988 100644 --- a/crates/orchestrator/src/workers/proving.rs +++ b/crates/orchestrator/src/workers/proving.rs @@ -1,9 +1,10 @@ -use crate::config::config; +use crate::config::Config; use crate::jobs::create_job; use crate::jobs::types::{JobStatus, JobType}; use crate::workers::Worker; use async_trait::async_trait; use std::error::Error; +use std::sync::Arc; pub struct ProvingWorker; @@ -11,15 +12,14 @@ pub struct ProvingWorker; impl Worker for ProvingWorker { /// 1. Fetch all successful SNOS job runs that don't have a proving job /// 2. Create a proving job for each SNOS job run - async fn run_worker(&self) -> Result<(), Box> { - let config = config().await; + async fn run_worker(&self, config: Arc) -> Result<(), Box> { let successful_snos_jobs = config .database() .get_jobs_without_successor(JobType::SnosRun, JobStatus::Completed, JobType::ProofCreation) .await?; for job in successful_snos_jobs { - create_job(JobType::ProofCreation, job.internal_id.to_string(), job.metadata).await? + create_job(JobType::ProofCreation, job.internal_id.to_string(), job.metadata, config.clone()).await? } Ok(()) diff --git a/crates/orchestrator/src/workers/snos.rs b/crates/orchestrator/src/workers/snos.rs index 7508263a..69529a77 100644 --- a/crates/orchestrator/src/workers/snos.rs +++ b/crates/orchestrator/src/workers/snos.rs @@ -1,10 +1,11 @@ use std::collections::HashMap; use std::error::Error; +use std::sync::Arc; use async_trait::async_trait; use starknet::providers::Provider; -use crate::config::config; +use crate::config::Config; use crate::jobs::create_job; use crate::jobs::types::{JobStatus, JobType}; use crate::workers::Worker; @@ -16,8 +17,7 @@ impl Worker for SnosWorker { /// 1. Fetch the latest completed block from the Starknet chain /// 2. Fetch the last block that had a SNOS job run. /// 3. Create SNOS run jobs for all the remaining blocks - async fn run_worker(&self) -> Result<(), Box> { - let config = config().await; + async fn run_worker(&self, config: Arc) -> Result<(), Box> { let provider = config.starknet_client(); let latest_block_number = provider.block_number().await?; let latest_block_processed_data = config @@ -38,7 +38,7 @@ impl Worker for SnosWorker { } for x in latest_block_processed + 1..latest_block_number + 1 { - create_job(JobType::SnosRun, x.to_string(), HashMap::new()).await?; + create_job(JobType::SnosRun, x.to_string(), HashMap::new(), config.clone()).await?; } Ok(()) diff --git a/crates/orchestrator/src/workers/update_state.rs b/crates/orchestrator/src/workers/update_state.rs index 6f71f1ac..f4821aca 100644 --- a/crates/orchestrator/src/workers/update_state.rs +++ b/crates/orchestrator/src/workers/update_state.rs @@ -1,8 +1,9 @@ use std::error::Error; +use std::sync::Arc; use async_trait::async_trait; -use crate::config::config; +use crate::config::Config; use crate::jobs::constants::JOB_METADATA_STATE_UPDATE_BLOCKS_TO_SETTLE_KEY; use crate::jobs::create_job; use crate::jobs::types::{JobItem, JobStatus, JobType}; @@ -15,8 +16,7 @@ impl Worker for UpdateStateWorker { /// 1. Fetch the last successful state update job /// 2. Fetch all successful proving jobs covering blocks after the last state update /// 3. Create state updates for all the blocks that don't have a state update job - async fn run_worker(&self) -> Result<(), Box> { - let config = config().await; + async fn run_worker(&self, config: Arc) -> Result<(), Box> { let latest_successful_job = config.database().get_latest_job_by_type_and_status(JobType::StateTransition, JobStatus::Completed).await?; @@ -40,7 +40,13 @@ impl Worker for UpdateStateWorker { ); // Creating a single job for all the pending blocks. - create_job(JobType::StateTransition, successful_proving_jobs[0].internal_id.clone(), metadata).await?; + create_job( + JobType::StateTransition, + successful_proving_jobs[0].internal_id.clone(), + metadata, + config.clone(), + ) + .await?; Ok(()) }