diff --git a/Cargo.lock b/Cargo.lock index 31f01a43..9b3b91ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3652,6 +3652,7 @@ name = "orchestrator" version = "0.1.0" dependencies = [ "arc-swap", + "async-std", "async-trait", "axum 0.7.5", "axum-macros", diff --git a/crates/orchestrator/Cargo.toml b/crates/orchestrator/Cargo.toml index 51271861..528582a1 100644 --- a/crates/orchestrator/Cargo.toml +++ b/crates/orchestrator/Cargo.toml @@ -34,6 +34,7 @@ tracing = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter"] } url = { workspace = true } uuid = { workspace = true, features = ["v4", "serde"] } +async-std = "1.12.0" [features] default = ["ethereum", "with_mongodb", "with_sqs"] diff --git a/crates/orchestrator/src/config.rs b/crates/orchestrator/src/config.rs index ae051709..ef05d9eb 100644 --- a/crates/orchestrator/src/config.rs +++ b/crates/orchestrator/src/config.rs @@ -95,6 +95,7 @@ pub async fn config() -> Guard> { /// However, when running tests, we often want to reinitialize because we want to clear the DB and /// set it up again for reuse in new tests. By calling `config_force_init` we replace the already /// stored config inside `ArcSwap` with the new configuration and pool settings. +#[cfg(test)] pub async fn config_force_init(config: Config) { match CONFIG.get() { Some(arc) => arc.store(Arc::new(config)), diff --git a/crates/orchestrator/src/database/mod.rs b/crates/orchestrator/src/database/mod.rs index beff0903..8432c293 100644 --- a/crates/orchestrator/src/database/mod.rs +++ b/crates/orchestrator/src/database/mod.rs @@ -1,5 +1,4 @@ use crate::jobs::types::{JobItem, JobStatus, JobType}; -use ::mongodb::Cursor; use async_trait::async_trait; use color_eyre::Result; use mockall::automock; @@ -35,7 +34,6 @@ pub trait Database: Send + Sync { async fn update_metadata(&self, job: &JobItem, metadata: HashMap) -> Result<()>; async fn get_latest_job_by_type(&self, job_type: JobType) -> Result>; - async fn get_all_jobs(&self, job_type: JobType) -> Result>; } pub trait DatabaseConfig { diff --git a/crates/orchestrator/src/database/mongodb/mod.rs b/crates/orchestrator/src/database/mongodb/mod.rs index 1a4a7215..7e698ceb 100644 --- a/crates/orchestrator/src/database/mongodb/mod.rs +++ b/crates/orchestrator/src/database/mongodb/mod.rs @@ -9,7 +9,7 @@ use mongodb::options::{FindOneOptions, UpdateOptions}; use mongodb::{ bson::doc, options::{ClientOptions, ServerApi, ServerApiVersion}, - Client, Collection, Cursor, + Client, Collection, }; use std::collections::HashMap; use uuid::Uuid; @@ -127,11 +127,4 @@ impl Database for MongoDb { .await .expect("Failed to fetch latest job by given job type")) } - - async fn get_all_jobs(&self, job_type: JobType) -> Result> { - let filter = doc! { - "job_type": mongodb::bson::to_bson(&job_type)?, - }; - Ok(self.get_job_collection().find(filter, None).await.expect("Failed to fetch jobs with given job type")) - } } diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index 8a06ceb3..5afb4849 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -170,7 +170,7 @@ pub async fn verify_job(id: Uuid) -> Result<()> { fn get_job_handler(job_type: &JobType) -> Box { match job_type { JobType::DataSubmission => Box::new(da_job::DaJob), - JobType::SnosRun => Box::new(da_job::DaJob), + JobType::SnosRun => Box::new(snos_job::SnosJob), _ => unimplemented!("Job type not implemented yet."), } } diff --git a/crates/orchestrator/src/jobs/types.rs b/crates/orchestrator/src/jobs/types.rs index bcd8556b..7640ccad 100644 --- a/crates/orchestrator/src/jobs/types.rs +++ b/crates/orchestrator/src/jobs/types.rs @@ -7,7 +7,7 @@ use std::collections::HashMap; use uuid::Uuid; /// An external id. -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] #[serde(untagged)] pub enum ExternalId { /// A string. @@ -98,7 +98,7 @@ pub enum JobStatus { VerificationFailed, } -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] pub struct JobItem { /// an uuid to identify a job #[serde(with = "uuid_1_as_binary")] diff --git a/crates/orchestrator/src/main.rs b/crates/orchestrator/src/main.rs index c8a031d8..4df82c94 100644 --- a/crates/orchestrator/src/main.rs +++ b/crates/orchestrator/src/main.rs @@ -40,7 +40,7 @@ async fn main() { async fn start_cron(worker: Box, interval: u64) { loop { - worker.run_worker().await; + worker.run_worker().await.expect("Error in running the worker."); tokio::time::sleep(tokio::time::Duration::from_secs(interval)).await; } } diff --git a/crates/orchestrator/src/tests/jobs/da_job/mod.rs b/crates/orchestrator/src/tests/jobs/da_job/mod.rs index 81166995..cbb18095 100644 --- a/crates/orchestrator/src/tests/jobs/da_job/mod.rs +++ b/crates/orchestrator/src/tests/jobs/da_job/mod.rs @@ -8,6 +8,7 @@ use serde_json::json; use super::super::common::{default_job_item, init_config}; use starknet_core::types::{FieldElement, MaybePendingStateUpdate, StateDiff}; +use tracing::log; use uuid::Uuid; use crate::jobs::types::ExternalId; @@ -55,6 +56,8 @@ async fn test_process_job() { da_client.expect_publish_state_diff().times(1).returning(|_| Ok(internal_id.to_string())); let config = init_config(Some(format!("http://localhost:{}", server.port())), None, None, Some(da_client)).await; + log::info!("this is the port {}", server.port()); + let state_update = MaybePendingStateUpdate::Update(StateUpdate { block_hash: FieldElement::default(), new_root: FieldElement::default(), diff --git a/crates/orchestrator/src/tests/workers/mod.rs b/crates/orchestrator/src/tests/workers/mod.rs index 0c990586..fc2b53c4 100644 --- a/crates/orchestrator/src/tests/workers/mod.rs +++ b/crates/orchestrator/src/tests/workers/mod.rs @@ -1,63 +1,79 @@ use crate::config::config_force_init; use crate::database::MockDatabase; use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType}; -use crate::queue::job_queue::JobQueueMessage; use crate::queue::MockQueueProvider; use crate::tests::common::init_config; use crate::workers::snos::SnosWorker; use crate::workers::Worker; use da_client_interface::MockDaClient; use httpmock::MockServer; +use mockall::predicate::eq; use rstest::rstest; use serde_json::json; use std::collections::HashMap; +use std::error::Error; use uuid::Uuid; #[rstest] +#[case(false)] +#[case(true)] #[tokio::test] -async fn test_create_job() { +async fn test_snos_worker(#[case] db_val: bool) -> Result<(), Box> { let server = MockServer::start(); let da_client = MockDaClient::new(); let mut db = MockDatabase::new(); let mut queue = MockQueueProvider::new(); + let start_job_index; + let block; - // Mocking db functions - db.expect_get_latest_job_by_type().returning(|_| Ok(None)).call(JobType::SnosRun).expect("Failed to call."); - // Getting jobs for check expectations - for i in 1..6 { - db.expect_get_job_by_internal_id_and_type() - .returning(|_, _| Ok(None)) - .call(&i.to_string(), &JobType::SnosRun) - .expect("Failed to call."); + const JOB_PROCESSING_QUEUE: &str = "madara_orchestrator_job_processing_queue"; + + // Mocking db function expectations + if !db_val { + db.expect_get_latest_job_by_type().times(1).with(eq(JobType::SnosRun)).returning(|_| Ok(None)); + start_job_index = 1; + block = 5; + } else { + let uuid_temp = Uuid::new_v4(); + + db.expect_get_latest_job_by_type() + .with(eq(JobType::SnosRun)) + .returning(move |_| Ok(Some(get_job_item_mock_by_id("1".to_string(), uuid_temp.clone())))); + block = 6; + start_job_index = 2; } - // Creating jobs expectations - for i in 1..6 { + for i in start_job_index..block + 1 { + // Getting jobs for check expectations + db.expect_get_job_by_internal_id_and_type() + .times(1) + .with(eq(i.clone().to_string()), eq(JobType::SnosRun)) + .returning(|_, _| Ok(None)); + + let uuid = Uuid::new_v4(); + + // creating jobs call expectations db.expect_create_job() - .returning(|_| Ok(get_job_item_mock_by_id("1".to_string()))) - .call(get_job_item_mock_by_id(i.to_string())) - .expect("Failed to call"); + .times(1) + .withf(move |item| item.internal_id == i.clone().to_string()) + .returning(move |_| Ok(get_job_item_mock_by_id(i.clone().to_string(), uuid.clone()))); } // Queue function call simulations - queue - .expect_send_message_to_queue() - .returning(|_, _, _| Ok(())) - .call( - "madara_orchestrator_job_processing_queue".to_string(), - serde_json::to_string(&JobQueueMessage { id: Uuid::new_v4() }).unwrap(), - None, - ) - .expect("Failed to call"); + queue.expect_send_message_to_queue().returning(|_, _, _| Ok(())).withf( + |queue, _payload, _delay| { + queue == JOB_PROCESSING_QUEUE + } + ); // mock block number (madara) : 5 - let rpc_response_block_number = 5; + let rpc_response_block_number = block; let response = json!({ "id": 1,"jsonrpc":"2.0","result": rpc_response_block_number }); - let config = init_config(Some(format!("http://localhost:{}", server.port())), Some(db), Some(queue), Some(da_client)).await; config_force_init(config).await; + // mocking block call let rpc_block_call_mock = server.mock(|when, then| { when.path("/").body_contains("starknet_blockNumber"); @@ -65,14 +81,14 @@ async fn test_create_job() { }); let snos_worker = SnosWorker {}; - snos_worker.run_worker().await; + snos_worker.run_worker().await?; rpc_block_call_mock.assert(); -} -fn get_job_item_mock_by_id(id: String) -> JobItem { - let uuid = Uuid::new_v4(); + Ok(()) +} +fn get_job_item_mock_by_id(id: String, uuid: Uuid) -> JobItem { JobItem { id: uuid, internal_id: id.clone(), diff --git a/crates/orchestrator/src/workers/mod.rs b/crates/orchestrator/src/workers/mod.rs index 846d0c8c..7d198c0c 100644 --- a/crates/orchestrator/src/workers/mod.rs +++ b/crates/orchestrator/src/workers/mod.rs @@ -1,4 +1,5 @@ use async_trait::async_trait; +use std::error::Error; pub mod proof_registration; pub mod proving; @@ -7,5 +8,5 @@ pub mod update_state; #[async_trait] pub trait Worker: Send + Sync { - async fn run_worker(&self); + async fn run_worker(&self) -> Result<(), Box>; } diff --git a/crates/orchestrator/src/workers/proof_registration.rs b/crates/orchestrator/src/workers/proof_registration.rs index 5ad5bc2d..a51bd412 100644 --- a/crates/orchestrator/src/workers/proof_registration.rs +++ b/crates/orchestrator/src/workers/proof_registration.rs @@ -1,5 +1,6 @@ use crate::workers::Worker; use async_trait::async_trait; +use std::error::Error; pub struct ProofRegistrationWorker; @@ -8,7 +9,7 @@ impl Worker for ProofRegistrationWorker { /// 1. Fetch all blocks with a successful proving job run /// 2. Group blocks that have the same proof /// 3. For each group, create a proof registration job with from and to block in metadata - async fn run_worker(&self) { + async fn run_worker(&self) -> Result<(), Box> { todo!() } } diff --git a/crates/orchestrator/src/workers/proving.rs b/crates/orchestrator/src/workers/proving.rs index 9476ea71..61bc19c2 100644 --- a/crates/orchestrator/src/workers/proving.rs +++ b/crates/orchestrator/src/workers/proving.rs @@ -1,5 +1,6 @@ use crate::workers::Worker; use async_trait::async_trait; +use std::error::Error; pub struct ProvingWorker; @@ -7,7 +8,7 @@ pub struct ProvingWorker; impl Worker for ProvingWorker { /// 1. Fetch all successful SNOS job runs that don't have a proving job /// 2. Create a proving job for each SNOS job run - async fn run_worker(&self) { + async fn run_worker(&self) -> Result<(), Box> { todo!() } } diff --git a/crates/orchestrator/src/workers/snos.rs b/crates/orchestrator/src/workers/snos.rs index 43575043..6853c6cb 100644 --- a/crates/orchestrator/src/workers/snos.rs +++ b/crates/orchestrator/src/workers/snos.rs @@ -5,7 +5,7 @@ use crate::workers::Worker; use async_trait::async_trait; use starknet::providers::Provider; use std::collections::HashMap; -use tracing::log; +use std::error::Error; pub struct SnosWorker; @@ -14,11 +14,10 @@ impl Worker for SnosWorker { /// 1. Fetch the latest completed block from the Starknet chain /// 2. Fetch the last block that had a SNOS job run. /// 3. Create SNOS run jobs for all the remaining blocks - // TEST : added config temporarily to test - async fn run_worker(&self) { + async fn run_worker(&self) -> Result<(), Box> { let config = config().await; let provider = config.starknet_client(); - let latest_block_number = provider.block_number().await.expect("Failed to fetch block number from rpc"); + let latest_block_number = provider.block_number().await?; let latest_block_processed_data = config .database() .get_latest_job_by_type(JobType::SnosRun) @@ -27,22 +26,19 @@ impl Worker for SnosWorker { .map(|item| item.internal_id) .unwrap_or("0".to_string()); - let latest_block_processed: u64 = - latest_block_processed_data.parse().expect("Failed to convert block number from JobItem into u64"); + let latest_block_processed: u64 = latest_block_processed_data.parse()?; let block_diff = latest_block_number - latest_block_processed; // if all blocks are processed if block_diff == 0 { - return; + return Ok(()); } for x in latest_block_processed + 1..latest_block_number + 1 { - create_job(JobType::SnosRun, x.to_string(), HashMap::new()) - .await - .expect("Error : failed to create job for snos workers."); + create_job(JobType::SnosRun, x.to_string(), HashMap::new()).await?; } - log::info!("jobs created !!"); + Ok(()) } } diff --git a/crates/orchestrator/src/workers/update_state.rs b/crates/orchestrator/src/workers/update_state.rs index c359e99e..faca16ab 100644 --- a/crates/orchestrator/src/workers/update_state.rs +++ b/crates/orchestrator/src/workers/update_state.rs @@ -1,5 +1,6 @@ use crate::workers::Worker; use async_trait::async_trait; +use std::error::Error; pub struct UpdateStateWorker; @@ -8,7 +9,7 @@ impl Worker for UpdateStateWorker { /// 1. Fetch the last succesful state update job /// 2. Fetch all succesful proving jobs covering blocks after the last state update /// 3. Create state updates for all the blocks that don't have a state update job - async fn run_worker(&self) { + async fn run_worker(&self) -> Result<(), Box> { todo!() } }