From 2e9cf16c363855447778352e501302428b96b585 Mon Sep 17 00:00:00 2001 From: Arun Jangra Date: Thu, 13 Jun 2024 19:21:09 +0530 Subject: [PATCH] feat : added snos worker implementation and unit tests --- .github/workflows/coverage.yml | 2 +- Cargo.lock | 9 +- Cargo.toml | 1 + crates/orchestrator/Cargo.toml | 1 + crates/orchestrator/src/config.rs | 24 +++++- crates/orchestrator/src/database/mod.rs | 3 + .../orchestrator/src/database/mongodb/mod.rs | 23 ++++- crates/orchestrator/src/jobs/mod.rs | 7 +- crates/orchestrator/src/main.rs | 2 +- crates/orchestrator/src/queue/job_queue.rs | 4 +- crates/orchestrator/src/tests/mod.rs | 1 + crates/orchestrator/src/tests/workers/mod.rs | 85 +++++++++++++++++++ crates/orchestrator/src/workers/snos.rs | 36 +++++++- 13 files changed, 184 insertions(+), 14 deletions(-) create mode 100644 crates/orchestrator/src/tests/workers/mod.rs diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index 87818596..29f80c5a 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -26,7 +26,7 @@ jobs: - name: Run llvm-cov run: | - cargo llvm-cov nextest --release --lcov --output-path lcov.info + cargo llvm-cov nextest --release --lcov --output-path lcov.info --test-threads=1 - name: Upload coverage to codecov.io uses: codecov/codecov-action@v3 diff --git a/Cargo.lock b/Cargo.lock index d926e2e2..31f01a43 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -400,6 +400,12 @@ version = "1.0.81" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0952808a6c2afd1aa8947271f3a60f1a6763c7b912d210184c5149b5cf147247" +[[package]] +name = "arc-swap" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" + [[package]] name = "ark-ff" version = "0.3.0" @@ -3645,6 +3651,7 @@ dependencies = [ name = "orchestrator" version = "0.1.0" dependencies = [ + "arc-swap", "async-trait", "axum 0.7.5", "axum-macros", @@ -4067,7 +4074,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "efb6c9a1dd1def8e2124d17e83a20af56f1570d6c2d2bd9e266ccb768df3840e" dependencies = [ "anyhow", - "itertools 0.10.5", + "itertools 0.11.0", "proc-macro2", "quote", "syn 2.0.58", diff --git a/Cargo.toml b/Cargo.toml index 2f90e85a..0613091c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,3 +38,4 @@ httpmock = { version = "0.7.0" } da-client-interface = { path = "crates/da_clients/da-client-interface" } ethereum-da-client = { path = "crates/da_clients/ethereum" } utils = { path = "crates/utils" } +arc-swap = { version = "1.7.1" } diff --git a/crates/orchestrator/Cargo.toml b/crates/orchestrator/Cargo.toml index 28401584..51271861 100644 --- a/crates/orchestrator/Cargo.toml +++ b/crates/orchestrator/Cargo.toml @@ -12,6 +12,7 @@ name = "orchestrator" path = "src/main.rs" [dependencies] +arc-swap = { workspace = true } async-trait = { workspace = true } axum = { workspace = true, features = ["macros"] } axum-macros = { workspace = true } diff --git a/crates/orchestrator/src/config.rs b/crates/orchestrator/src/config.rs index 1e3d81f0..ae051709 100644 --- a/crates/orchestrator/src/config.rs +++ b/crates/orchestrator/src/config.rs @@ -4,6 +4,7 @@ use crate::database::{Database, DatabaseConfig}; use crate::queue::sqs::SqsQueue; use crate::queue::QueueProvider; use crate::utils::env_utils::get_env_var_or_panic; +use arc_swap::{ArcSwap, Guard}; use da_client_interface::DaClient; use da_client_interface::DaConfig; use dotenvy::dotenv; @@ -79,11 +80,28 @@ impl Config { /// The app config. It can be accessed from anywhere inside the service. /// It's initialized only once. -pub static CONFIG: OnceCell = OnceCell::const_new(); +/// We are using `ArcSwap` as it allow us to replace the new `Config` with +/// a new one which is required when running test cases. This approach was +/// inspired from here - https://github.com/matklad/once_cell/issues/127 +pub static CONFIG: OnceCell> = OnceCell::const_new(); /// Returns the app config. Initializes if not already done. -pub async fn config() -> &'static Config { - CONFIG.get_or_init(init_config).await +pub async fn config() -> Guard> { + let cfg = CONFIG.get_or_init(|| async { ArcSwap::from_pointee(init_config().await) }).await; + cfg.load() +} + +/// OnceCell only allows us to initialize the config once and that's how it should be on production. +/// However, when running tests, we often want to reinitialize because we want to clear the DB and +/// set it up again for reuse in new tests. By calling `config_force_init` we replace the already +/// stored config inside `ArcSwap` with the new configuration and pool settings. +pub async fn config_force_init(config: Config) { + match CONFIG.get() { + Some(arc) => arc.store(Arc::new(config)), + None => { + CONFIG.get_or_init(|| async { ArcSwap::from_pointee(config) }).await; + } + } } /// Builds the DA client based on the environment variable DA_LAYER diff --git a/crates/orchestrator/src/database/mod.rs b/crates/orchestrator/src/database/mod.rs index e9639cf1..beff0903 100644 --- a/crates/orchestrator/src/database/mod.rs +++ b/crates/orchestrator/src/database/mod.rs @@ -1,4 +1,5 @@ use crate::jobs::types::{JobItem, JobStatus, JobType}; +use ::mongodb::Cursor; use async_trait::async_trait; use color_eyre::Result; use mockall::automock; @@ -33,6 +34,8 @@ pub trait Database: Send + Sync { ) -> Result<()>; async fn update_metadata(&self, job: &JobItem, metadata: HashMap) -> Result<()>; + async fn get_latest_job_by_type(&self, job_type: JobType) -> Result>; + async fn get_all_jobs(&self, job_type: JobType) -> Result>; } pub trait DatabaseConfig { diff --git a/crates/orchestrator/src/database/mongodb/mod.rs b/crates/orchestrator/src/database/mongodb/mod.rs index b86d4bde..1a4a7215 100644 --- a/crates/orchestrator/src/database/mongodb/mod.rs +++ b/crates/orchestrator/src/database/mongodb/mod.rs @@ -5,11 +5,11 @@ use async_trait::async_trait; use color_eyre::eyre::eyre; use color_eyre::Result; use mongodb::bson::Document; -use mongodb::options::UpdateOptions; +use mongodb::options::{FindOneOptions, UpdateOptions}; use mongodb::{ bson::doc, options::{ClientOptions, ServerApi, ServerApiVersion}, - Client, Collection, + Client, Collection, Cursor, }; use std::collections::HashMap; use uuid::Uuid; @@ -115,4 +115,23 @@ impl Database for MongoDb { self.update_job_optimistically(job, update).await?; Ok(()) } + + async fn get_latest_job_by_type(&self, job_type: JobType) -> Result> { + let filter = doc! { + "job_type": mongodb::bson::to_bson(&job_type)?, + }; + let find_options = FindOneOptions::builder().sort(doc! { "internal_id": -1 }).build(); + Ok(self + .get_job_collection() + .find_one(filter, find_options) + .await + .expect("Failed to fetch latest job by given job type")) + } + + async fn get_all_jobs(&self, job_type: JobType) -> Result> { + let filter = doc! { + "job_type": mongodb::bson::to_bson(&job_type)?, + }; + Ok(self.get_job_collection().find(filter, None).await.expect("Failed to fetch jobs with given job type")) + } } diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index f7011180..8a06ceb3 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -61,7 +61,7 @@ pub async fn create_job(job_type: JobType, internal_id: String, metadata: HashMa } let job_handler = get_job_handler(&job_type); - let job_item = job_handler.create_job(config, internal_id, metadata).await?; + let job_item = job_handler.create_job(config.as_ref(), internal_id, metadata).await?; config.database().create_job(job_item.clone()).await?; add_job_to_process_queue(job_item.id).await?; @@ -90,7 +90,7 @@ pub async fn process_job(id: Uuid) -> Result<()> { config.database().update_job_status(&job, JobStatus::LockedForProcessing).await?; let job_handler = get_job_handler(&job.job_type); - let external_id = job_handler.process_job(config, &job).await?; + let external_id = job_handler.process_job(config.as_ref(), &job).await?; let metadata = increment_key_in_metadata(&job.metadata, JOB_PROCESS_ATTEMPT_METADATA_KEY)?; config @@ -122,7 +122,7 @@ pub async fn verify_job(id: Uuid) -> Result<()> { } let job_handler = get_job_handler(&job.job_type); - let verification_status = job_handler.verify_job(config, &job).await?; + let verification_status = job_handler.verify_job(config.as_ref(), &job).await?; match verification_status { JobVerificationStatus::Verified => { @@ -170,6 +170,7 @@ pub async fn verify_job(id: Uuid) -> Result<()> { fn get_job_handler(job_type: &JobType) -> Box { match job_type { JobType::DataSubmission => Box::new(da_job::DaJob), + JobType::SnosRun => Box::new(da_job::DaJob), _ => unimplemented!("Job type not implemented yet."), } } diff --git a/crates/orchestrator/src/main.rs b/crates/orchestrator/src/main.rs index b6502683..c8a031d8 100644 --- a/crates/orchestrator/src/main.rs +++ b/crates/orchestrator/src/main.rs @@ -26,7 +26,7 @@ async fn main() { // init consumer init_consumers().await.expect("Failed to init consumers"); - // spawn a thread for each worker + // spawn a thread for each workers // changes in rollup mode - sovereign, validity, validiums etc. // will likely involve changes in these workers as well tokio::spawn(start_cron(Box::new(SnosWorker), 60)); diff --git a/crates/orchestrator/src/queue/job_queue.rs b/crates/orchestrator/src/queue/job_queue.rs index 78fbd284..709d3aa8 100644 --- a/crates/orchestrator/src/queue/job_queue.rs +++ b/crates/orchestrator/src/queue/job_queue.rs @@ -14,8 +14,8 @@ const JOB_PROCESSING_QUEUE: &str = "madara_orchestrator_job_processing_queue"; const JOB_VERIFICATION_QUEUE: &str = "madara_orchestrator_job_verification_queue"; #[derive(Debug, Serialize, Deserialize)] -struct JobQueueMessage { - id: Uuid, +pub struct JobQueueMessage { + pub(crate) id: Uuid, } pub async fn add_job_to_process_queue(id: Uuid) -> Result<()> { diff --git a/crates/orchestrator/src/tests/mod.rs b/crates/orchestrator/src/tests/mod.rs index 1da43042..cbd1bc07 100644 --- a/crates/orchestrator/src/tests/mod.rs +++ b/crates/orchestrator/src/tests/mod.rs @@ -7,3 +7,4 @@ pub mod server; pub mod queue; pub mod common; +mod workers; diff --git a/crates/orchestrator/src/tests/workers/mod.rs b/crates/orchestrator/src/tests/workers/mod.rs new file mode 100644 index 00000000..0c990586 --- /dev/null +++ b/crates/orchestrator/src/tests/workers/mod.rs @@ -0,0 +1,85 @@ +use crate::config::config_force_init; +use crate::database::MockDatabase; +use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType}; +use crate::queue::job_queue::JobQueueMessage; +use crate::queue::MockQueueProvider; +use crate::tests::common::init_config; +use crate::workers::snos::SnosWorker; +use crate::workers::Worker; +use da_client_interface::MockDaClient; +use httpmock::MockServer; +use rstest::rstest; +use serde_json::json; +use std::collections::HashMap; +use uuid::Uuid; + +#[rstest] +#[tokio::test] +async fn test_create_job() { + let server = MockServer::start(); + let da_client = MockDaClient::new(); + let mut db = MockDatabase::new(); + let mut queue = MockQueueProvider::new(); + + // Mocking db functions + db.expect_get_latest_job_by_type().returning(|_| Ok(None)).call(JobType::SnosRun).expect("Failed to call."); + // Getting jobs for check expectations + for i in 1..6 { + db.expect_get_job_by_internal_id_and_type() + .returning(|_, _| Ok(None)) + .call(&i.to_string(), &JobType::SnosRun) + .expect("Failed to call."); + } + + // Creating jobs expectations + for i in 1..6 { + db.expect_create_job() + .returning(|_| Ok(get_job_item_mock_by_id("1".to_string()))) + .call(get_job_item_mock_by_id(i.to_string())) + .expect("Failed to call"); + } + + // Queue function call simulations + queue + .expect_send_message_to_queue() + .returning(|_, _, _| Ok(())) + .call( + "madara_orchestrator_job_processing_queue".to_string(), + serde_json::to_string(&JobQueueMessage { id: Uuid::new_v4() }).unwrap(), + None, + ) + .expect("Failed to call"); + + // mock block number (madara) : 5 + let rpc_response_block_number = 5; + let response = json!({ "id": 1,"jsonrpc":"2.0","result": rpc_response_block_number }); + + let config = + init_config(Some(format!("http://localhost:{}", server.port())), Some(db), Some(queue), Some(da_client)).await; + config_force_init(config).await; + + // mocking block call + let rpc_block_call_mock = server.mock(|when, then| { + when.path("/").body_contains("starknet_blockNumber"); + then.status(200).body(serde_json::to_vec(&response).unwrap()); + }); + + let snos_worker = SnosWorker {}; + snos_worker.run_worker().await; + + rpc_block_call_mock.assert(); +} + +fn get_job_item_mock_by_id(id: String) -> JobItem { + let uuid = Uuid::new_v4(); + + JobItem { + id: uuid, + internal_id: id.clone(), + job_type: JobType::SnosRun, + status: JobStatus::Created, + external_id: ExternalId::Number(0), + metadata: HashMap::new(), + version: 0, + } +} diff --git a/crates/orchestrator/src/workers/snos.rs b/crates/orchestrator/src/workers/snos.rs index bdc04169..43575043 100644 --- a/crates/orchestrator/src/workers/snos.rs +++ b/crates/orchestrator/src/workers/snos.rs @@ -1,5 +1,11 @@ +use crate::config::config; +use crate::jobs::create_job; +use crate::jobs::types::JobType; use crate::workers::Worker; use async_trait::async_trait; +use starknet::providers::Provider; +use std::collections::HashMap; +use tracing::log; pub struct SnosWorker; @@ -8,7 +14,35 @@ impl Worker for SnosWorker { /// 1. Fetch the latest completed block from the Starknet chain /// 2. Fetch the last block that had a SNOS job run. /// 3. Create SNOS run jobs for all the remaining blocks + // TEST : added config temporarily to test async fn run_worker(&self) { - todo!() + let config = config().await; + let provider = config.starknet_client(); + let latest_block_number = provider.block_number().await.expect("Failed to fetch block number from rpc"); + let latest_block_processed_data = config + .database() + .get_latest_job_by_type(JobType::SnosRun) + .await + .unwrap() + .map(|item| item.internal_id) + .unwrap_or("0".to_string()); + + let latest_block_processed: u64 = + latest_block_processed_data.parse().expect("Failed to convert block number from JobItem into u64"); + + let block_diff = latest_block_number - latest_block_processed; + + // if all blocks are processed + if block_diff == 0 { + return; + } + + for x in latest_block_processed + 1..latest_block_number + 1 { + create_job(JobType::SnosRun, x.to_string(), HashMap::new()) + .await + .expect("Error : failed to create job for snos workers."); + } + + log::info!("jobs created !!"); } }